Merge pull request #2135 from AkihiroSuda/oci-content-store

content: change Writer/ReaderAt to take OCI descriptor
This commit is contained in:
Michael Crosby 2018-06-01 13:14:35 -04:00 committed by GitHub
commit 5b1f69be8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 279 additions and 169 deletions

View File

@ -214,7 +214,7 @@ func TestImagePullAllPlatforms(t *testing.T) {
} }
// check if childless data type has blob in content store // check if childless data type has blob in content store
for _, desc := range children { for _, desc := range children {
ra, err := cs.ReaderAt(ctx, desc.Digest) ra, err := cs.ReaderAt(ctx, desc)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -275,7 +275,7 @@ func TestImagePullSomePlatforms(t *testing.T) {
// check if childless data type has blob in content store // check if childless data type has blob in content store
for _, desc := range children { for _, desc := range children {
ra, err := cs.ReaderAt(ctx, desc.Digest) ra, err := cs.ReaderAt(ctx, desc)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -72,7 +72,7 @@ var (
} }
defer cancel() defer cancel()
cs := client.ContentStore() cs := client.ContentStore()
ra, err := cs.ReaderAt(ctx, dgst) ra, err := cs.ReaderAt(ctx, ocispec.Descriptor{Digest: dgst})
if err != nil { if err != nil {
return err return err
} }
@ -121,7 +121,7 @@ var (
// TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect // TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect
// all data to be written in a single invocation. Allow multiple writes // all data to be written in a single invocation. Allow multiple writes
// to the same transaction key followed by a commit. // to the same transaction key followed by a commit.
return content.WriteBlob(ctx, cs, ref, os.Stdin, expectedSize, expectedDigest) return content.WriteBlob(ctx, cs, ref, os.Stdin, ocispec.Descriptor{Size: expectedSize, Digest: expectedDigest})
}, },
} }
@ -314,7 +314,7 @@ var (
} }
defer cancel() defer cancel()
cs := client.ContentStore() cs := client.ContentStore()
ra, err := cs.ReaderAt(ctx, dgst) ra, err := cs.ReaderAt(ctx, ocispec.Descriptor{Digest: dgst})
if err != nil { if err != nil {
return err return err
} }
@ -326,7 +326,7 @@ var (
} }
defer nrc.Close() defer nrc.Close()
wr, err := cs.Writer(ctx, "edit-"+object, 0, "") // TODO(stevvooe): Choose a better key? wr, err := cs.Writer(ctx, content.WithRef("edit-"+object)) // TODO(stevvooe): Choose a better key?
if err != nil { if err != nil {
return err return err
} }
@ -482,7 +482,7 @@ var (
Size: info.Size, Size: info.Size,
} }
ra, err := cs.ReaderAt(ctx, dgst) ra, err := cs.ReaderAt(ctx, desc)
if err != nil { if err != nil {
return err return err
} }

View File

@ -167,7 +167,7 @@ var diffCommand = cli.Command{
} }
} }
ra, err := client.ContentStore().ReaderAt(ctx, desc.Digest) ra, err := client.ContentStore().ReaderAt(ctx, desc)
if err != nil { if err != nil {
return err return err
} }

View File

@ -36,9 +36,9 @@ import (
"github.com/containerd/containerd/runtime/linux/runctypes" "github.com/containerd/containerd/runtime/linux/runctypes"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
protobuf "github.com/gogo/protobuf/types" protobuf "github.com/gogo/protobuf/types"
digest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity" "github.com/opencontainers/image-spec/identity"
"github.com/opencontainers/image-spec/specs-go/v1" "github.com/opencontainers/image-spec/specs-go/v1"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -50,10 +50,9 @@ func WithCheckpoint(im Image, snapshotKey string) NewContainerOpts {
return func(ctx context.Context, client *Client, c *containers.Container) error { return func(ctx context.Context, client *Client, c *containers.Container) error {
var ( var (
desc = im.Target() desc = im.Target()
id = desc.Digest
store = client.ContentStore() store = client.ContentStore()
) )
index, err := decodeIndex(ctx, store, id) index, err := decodeIndex(ctx, store, desc)
if err != nil { if err != nil {
return err return err
} }
@ -80,7 +79,7 @@ func WithCheckpoint(im Image, snapshotKey string) NewContainerOpts {
} }
c.Image = index.Annotations["image.name"] c.Image = index.Annotations["image.name"]
case images.MediaTypeContainerd1CheckpointConfig: case images.MediaTypeContainerd1CheckpointConfig:
data, err := content.ReadBlob(ctx, store, m.Digest) data, err := content.ReadBlob(ctx, store, m)
if err != nil { if err != nil {
return errors.Wrap(err, "unable to read checkpoint config") return errors.Wrap(err, "unable to read checkpoint config")
} }
@ -113,7 +112,7 @@ func WithTaskCheckpoint(im Image) NewTaskOpts {
return func(ctx context.Context, c *Client, info *TaskInfo) error { return func(ctx context.Context, c *Client, info *TaskInfo) error {
desc := im.Target() desc := im.Target()
id := desc.Digest id := desc.Digest
index, err := decodeIndex(ctx, c.ContentStore(), id) index, err := decodeIndex(ctx, c.ContentStore(), desc)
if err != nil { if err != nil {
return err return err
} }
@ -131,9 +130,9 @@ func WithTaskCheckpoint(im Image) NewTaskOpts {
} }
} }
func decodeIndex(ctx context.Context, store content.Provider, id digest.Digest) (*v1.Index, error) { func decodeIndex(ctx context.Context, store content.Provider, desc ocispec.Descriptor) (*v1.Index, error) {
var index v1.Index var index v1.Index
p, err := content.ReadBlob(ctx, store, id) p, err := content.ReadBlob(ctx, store, desc)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -22,6 +22,7 @@ import (
"time" "time"
"github.com/opencontainers/go-digest" "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
) )
// ReaderAt extends the standard io.ReaderAt interface with reporting of Size and io.Closer // ReaderAt extends the standard io.ReaderAt interface with reporting of Size and io.Closer
@ -33,12 +34,16 @@ type ReaderAt interface {
// Provider provides a reader interface for specific content // Provider provides a reader interface for specific content
type Provider interface { type Provider interface {
ReaderAt(ctx context.Context, dgst digest.Digest) (ReaderAt, error) // ReaderAt only requires desc.Digest to be set.
// Other fields in the descriptor may be used internally for resolving
// the location of the actual data.
ReaderAt(ctx context.Context, dec ocispec.Descriptor) (ReaderAt, error)
} }
// Ingester writes content // Ingester writes content
type Ingester interface { type Ingester interface {
Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (Writer, error) // Some implementations require WithRef to be included in opts.
Writer(ctx context.Context, opts ...WriterOpt) (Writer, error)
} }
// Info holds content specific information // Info holds content specific information
@ -142,3 +147,33 @@ func WithLabels(labels map[string]string) Opt {
return nil return nil
} }
} }
// WriterOpts is internally used by WriterOpt.
type WriterOpts struct {
Ref string
Desc ocispec.Descriptor
}
// WriterOpt is used for passing options to Ingester.Writer.
type WriterOpt func(*WriterOpts) error
// WithDescriptor specifies an OCI descriptor.
// Writer may optionally use the descriptor internally for resolving
// the location of the actual data.
// Write does not require any field of desc to be set.
// If the data size is unknown, desc.Size should be set to 0.
// Some implementations may also accept negative values as "unknown".
func WithDescriptor(desc ocispec.Descriptor) WriterOpt {
return func(opts *WriterOpts) error {
opts.Desc = desc
return nil
}
}
// WithRef specifies a ref string.
func WithRef(ref string) WriterOpt {
return func(opts *WriterOpts) error {
opts.Ref = ref
return nil
}
}

View File

@ -26,6 +26,7 @@ import (
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/opencontainers/go-digest" "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -45,8 +46,8 @@ func NewReader(ra ReaderAt) io.Reader {
// ReadBlob retrieves the entire contents of the blob from the provider. // ReadBlob retrieves the entire contents of the blob from the provider.
// //
// Avoid using this for large blobs, such as layers. // Avoid using this for large blobs, such as layers.
func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byte, error) { func ReadBlob(ctx context.Context, provider Provider, desc ocispec.Descriptor) ([]byte, error) {
ra, err := provider.ReaderAt(ctx, dgst) ra, err := provider.ReaderAt(ctx, desc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -65,8 +66,8 @@ func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byt
// This is useful when the digest and size are known beforehand. // This is useful when the digest and size are known beforehand.
// //
// Copy is buffered, so no need to wrap reader in buffered io. // Copy is buffered, so no need to wrap reader in buffered io.
func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error { func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, desc ocispec.Descriptor, opts ...Opt) error {
cw, err := OpenWriter(ctx, cs, ref, size, expected) cw, err := OpenWriter(ctx, cs, WithRef(ref), WithDescriptor(desc))
if err != nil { if err != nil {
if !errdefs.IsAlreadyExists(err) { if !errdefs.IsAlreadyExists(err) {
return err return err
@ -76,19 +77,19 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i
} }
defer cw.Close() defer cw.Close()
return Copy(ctx, cw, r, size, expected, opts...) return Copy(ctx, cw, r, desc.Size, desc.Digest, opts...)
} }
// OpenWriter opens a new writer for the given reference, retrying if the writer // OpenWriter opens a new writer for the given reference, retrying if the writer
// is locked until the reference is available or returns an error. // is locked until the reference is available or returns an error.
func OpenWriter(ctx context.Context, cs Ingester, ref string, size int64, expected digest.Digest) (Writer, error) { func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, error) {
var ( var (
cw Writer cw Writer
err error err error
retry = 16 retry = 16
) )
for { for {
cw, err = cs.Writer(ctx, ref, size, expected) cw, err = cs.Writer(ctx, opts...)
if err != nil { if err != nil {
if !errdefs.IsUnavailable(err) { if !errdefs.IsUnavailable(err) {
return nil, err return nil, err

View File

@ -34,6 +34,7 @@ import (
"github.com/containerd/containerd/filters" "github.com/containerd/containerd/filters"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
digest "github.com/opencontainers/go-digest" digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -119,15 +120,15 @@ func (s *store) info(dgst digest.Digest, fi os.FileInfo, labels map[string]strin
} }
// ReaderAt returns an io.ReaderAt for the blob. // ReaderAt returns an io.ReaderAt for the blob.
func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { func (s *store) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
p := s.blobPath(dgst) p := s.blobPath(desc.Digest)
fi, err := os.Stat(p) fi, err := os.Stat(p)
if err != nil { if err != nil {
if !os.IsNotExist(err) { if !os.IsNotExist(err) {
return nil, err return nil, err
} }
return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p) return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p)
} }
fp, err := os.Open(p) fp, err := os.Open(p)
@ -136,7 +137,7 @@ func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (content.Reade
return nil, err return nil, err
} }
return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p) return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p)
} }
return sizeReaderAt{size: fi.Size(), fp: fp}, nil return sizeReaderAt{size: fi.Size(), fp: fp}, nil
@ -400,11 +401,22 @@ func (s *store) total(ingestPath string) int64 {
// ref at a time. // ref at a time.
// //
// The argument `ref` is used to uniquely identify a long-lived writer transaction. // The argument `ref` is used to uniquely identify a long-lived writer transaction.
func (s *store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) { func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
var wOpts content.WriterOpts
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil, err
}
}
// TODO(AkihiroSuda): we could create a random string or one calculated based on the context
// https://github.com/containerd/containerd/issues/2129#issuecomment-380255019
if wOpts.Ref == "" {
return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
}
var lockErr error var lockErr error
for count := uint64(0); count < 10; count++ { for count := uint64(0); count < 10; count++ {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1<<count))) time.Sleep(time.Millisecond * time.Duration(rand.Intn(1<<count)))
if err := tryLock(ref); err != nil { if err := tryLock(wOpts.Ref); err != nil {
if !errdefs.IsUnavailable(err) { if !errdefs.IsUnavailable(err) {
return nil, err return nil, err
} }
@ -420,9 +432,9 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di
return nil, lockErr return nil, lockErr
} }
w, err := s.writer(ctx, ref, total, expected) w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
if err != nil { if err != nil {
unlock(ref) unlock(wOpts.Ref)
return nil, err return nil, err
} }

View File

@ -39,6 +39,7 @@ import (
"github.com/containerd/containerd/pkg/testutil" "github.com/containerd/containerd/pkg/testutil"
"github.com/gotestyourself/gotestyourself/assert" "github.com/gotestyourself/gotestyourself/assert"
"github.com/opencontainers/go-digest" "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
) )
type memoryLabelStore struct { type memoryLabelStore struct {
@ -108,7 +109,7 @@ func TestContentWriter(t *testing.T) {
t.Fatal("ingest dir should be created", err) t.Fatal("ingest dir should be created", err)
} }
cw, err := cs.Writer(ctx, "myref", 0, "") cw, err := cs.Writer(ctx, content.WithRef("myref"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -117,13 +118,13 @@ func TestContentWriter(t *testing.T) {
} }
// reopen, so we can test things // reopen, so we can test things
cw, err = cs.Writer(ctx, "myref", 0, "") cw, err = cs.Writer(ctx, content.WithRef("myref"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// make sure that second resume also fails // make sure that second resume also fails
if _, err = cs.Writer(ctx, "myref", 0, ""); err == nil { if _, err = cs.Writer(ctx, content.WithRef("myref")); err == nil {
// TODO(stevvooe): This also works across processes. Need to find a way // TODO(stevvooe): This also works across processes. Need to find a way
// to test that, as well. // to test that, as well.
t.Fatal("no error on second resume") t.Fatal("no error on second resume")
@ -166,7 +167,7 @@ func TestContentWriter(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
cw, err = cs.Writer(ctx, "aref", 0, "") cw, err = cs.Writer(ctx, content.WithRef("aref"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -346,7 +347,8 @@ func checkBlobPath(t *testing.T, cs content.Store, dgst digest.Digest) string {
} }
func checkWrite(ctx context.Context, t checker, cs content.Store, dgst digest.Digest, p []byte) digest.Digest { func checkWrite(ctx context.Context, t checker, cs content.Store, dgst digest.Digest, p []byte) digest.Digest {
if err := content.WriteBlob(ctx, cs, dgst.String(), bytes.NewReader(p), int64(len(p)), dgst); err != nil { if err := content.WriteBlob(ctx, cs, dgst.String(), bytes.NewReader(p),
ocispec.Descriptor{Size: int64(len(p)), Digest: dgst}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -365,25 +367,25 @@ func TestWriterTruncateRecoversFromIncompleteWrite(t *testing.T) {
defer cancel() defer cancel()
ref := "ref" ref := "ref"
content := []byte("this is the content") contentB := []byte("this is the content")
total := int64(len(content)) total := int64(len(contentB))
setupIncompleteWrite(ctx, t, cs, ref, total) setupIncompleteWrite(ctx, t, cs, ref, total)
writer, err := cs.Writer(ctx, ref, total, "") writer, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: total}))
assert.NilError(t, err) assert.NilError(t, err)
assert.NilError(t, writer.Truncate(0)) assert.NilError(t, writer.Truncate(0))
_, err = writer.Write(content) _, err = writer.Write(contentB)
assert.NilError(t, err) assert.NilError(t, err)
dgst := digest.FromBytes(content) dgst := digest.FromBytes(contentB)
err = writer.Commit(ctx, total, dgst) err = writer.Commit(ctx, total, dgst)
assert.NilError(t, err) assert.NilError(t, err)
} }
func setupIncompleteWrite(ctx context.Context, t *testing.T, cs content.Store, ref string, total int64) { func setupIncompleteWrite(ctx context.Context, t *testing.T, cs content.Store, ref string, total int64) {
writer, err := cs.Writer(ctx, ref, total, "") writer, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: total}))
assert.NilError(t, err) assert.NilError(t, err)
_, err = writer.Write([]byte("bad data")) _, err = writer.Write([]byte("bad data"))

View File

@ -25,6 +25,7 @@ import (
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
protobuftypes "github.com/gogo/protobuf/types" protobuftypes "github.com/gogo/protobuf/types"
digest "github.com/opencontainers/go-digest" digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
) )
type proxyContentStore struct { type proxyContentStore struct {
@ -88,15 +89,16 @@ func (pcs *proxyContentStore) Delete(ctx context.Context, dgst digest.Digest) er
return nil return nil
} }
func (pcs *proxyContentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { // ReaderAt ignores MediaType.
i, err := pcs.Info(ctx, dgst) func (pcs *proxyContentStore) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
i, err := pcs.Info(ctx, desc.Digest)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &remoteReaderAt{ return &remoteReaderAt{
ctx: ctx, ctx: ctx,
digest: dgst, digest: desc.Digest,
size: i.Size, size: i.Size,
client: pcs.client, client: pcs.client,
}, nil }, nil
@ -157,14 +159,21 @@ func (pcs *proxyContentStore) ListStatuses(ctx context.Context, filters ...strin
return statuses, nil return statuses, nil
} }
func (pcs *proxyContentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { // Writer ignores MediaType.
wrclient, offset, err := pcs.negotiate(ctx, ref, size, expected) func (pcs *proxyContentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
var wOpts content.WriterOpts
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil, err
}
}
wrclient, offset, err := pcs.negotiate(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
if err != nil { if err != nil {
return nil, errdefs.FromGRPC(err) return nil, errdefs.FromGRPC(err)
} }
return &remoteWriter{ return &remoteWriter{
ref: ref, ref: wOpts.Ref,
client: wrclient, client: wrclient,
offset: offset, offset: offset,
}, nil }, nil

View File

@ -33,6 +33,7 @@ import (
"github.com/containerd/containerd/pkg/testutil" "github.com/containerd/containerd/pkg/testutil"
"github.com/gotestyourself/gotestyourself/assert" "github.com/gotestyourself/gotestyourself/assert"
digest "github.com/opencontainers/go-digest" digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -122,28 +123,28 @@ var labels = map[string]string{
func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store) { func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store) {
c1, d1 := createContent(256) c1, d1 := createContent(256)
w1, err := cs.Writer(ctx, "c1", 0, "") w1, err := cs.Writer(ctx, content.WithRef("c1"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer w1.Close() defer w1.Close()
c2, d2 := createContent(256) c2, d2 := createContent(256)
w2, err := cs.Writer(ctx, "c2", int64(len(c2)), "") w2, err := cs.Writer(ctx, content.WithRef("c2"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c2))}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer w2.Close() defer w2.Close()
c3, d3 := createContent(256) c3, d3 := createContent(256)
w3, err := cs.Writer(ctx, "c3", 0, d3) w3, err := cs.Writer(ctx, content.WithRef("c3"), content.WithDescriptor(ocispec.Descriptor{Digest: d3}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer w3.Close() defer w3.Close()
c4, d4 := createContent(256) c4, d4 := createContent(256)
w4, err := cs.Writer(ctx, "c4", int64(len(c4)), d4) w4, err := cs.Writer(ctx, content.WithRef("c4"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c4)), Digest: d4}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -226,7 +227,7 @@ func checkResumeWriter(ctx context.Context, t *testing.T, cs content.Store) {
) )
preStart := time.Now() preStart := time.Now()
w1, err := cs.Writer(ctx, ref, 256, dgst) w1, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: dgst}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -247,7 +248,7 @@ func checkResumeWriter(ctx context.Context, t *testing.T, cs content.Store) {
checkStatus(t, w1, expected, dgstFirst, preStart, postStart, preUpdate, postUpdate) checkStatus(t, w1, expected, dgstFirst, preStart, postStart, preUpdate, postUpdate)
assert.NilError(t, w1.Close(), "close first writer") assert.NilError(t, w1.Close(), "close first writer")
w2, err := cs.Writer(ctx, ref, 256, dgst) w2, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: dgst}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -284,7 +285,7 @@ func checkUpdateStatus(ctx context.Context, t *testing.T, cs content.Store) {
c1, d1 := createContent(256) c1, d1 := createContent(256)
preStart := time.Now() preStart := time.Now()
w1, err := cs.Writer(ctx, "c1", 256, d1) w1, err := cs.Writer(ctx, content.WithRef("c1"), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: d1}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -352,7 +353,7 @@ func checkUpdateStatus(ctx context.Context, t *testing.T, cs content.Store) {
func checkLabels(ctx context.Context, t *testing.T, cs content.Store) { func checkLabels(ctx context.Context, t *testing.T, cs content.Store) {
c1, d1 := createContent(256) c1, d1 := createContent(256)
w1, err := cs.Writer(ctx, "c1", 256, d1) w1, err := cs.Writer(ctx, content.WithRef("c1"), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: d1}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -427,7 +428,7 @@ func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64,
limit := int64(float64(size) * tp) limit := int64(float64(size) * tp)
ref := fmt.Sprintf("ref-%d-%d", i, j) ref := fmt.Sprintf("ref-%d-%d", i, j)
w, err := cs.Writer(ctx, ref, size, d) w, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -441,7 +442,7 @@ func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64,
t.Fatal(err) t.Fatal(err)
} }
w, err = cs.Writer(ctx, ref, size, d) w, err = cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -530,7 +531,7 @@ func checkSmallBlob(ctx context.Context, t *testing.T, store content.Store) {
blobSize := int64(len(blob)) blobSize := int64(len(blob))
blobDigest := digest.FromBytes(blob) blobDigest := digest.FromBytes(blob)
// test write // test write
w, err := store.Writer(ctx, t.Name(), blobSize, blobDigest) w, err := store.Writer(ctx, content.WithRef(t.Name()), content.WithDescriptor(ocispec.Descriptor{Size: blobSize, Digest: blobDigest}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -545,7 +546,7 @@ func checkSmallBlob(ctx context.Context, t *testing.T, store content.Store) {
} }
// test read. // test read.
readSize := blobSize + 1 readSize := blobSize + 1
ra, err := store.ReaderAt(ctx, blobDigest) ra, err := store.ReaderAt(ctx, ocispec.Descriptor{Digest: blobDigest})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -575,7 +576,7 @@ func checkCrossNSShare(ctx context.Context, t *testing.T, cs content.Store) {
ref := fmt.Sprintf("ref-%d", size) ref := fmt.Sprintf("ref-%d", size)
t1 := time.Now() t1 := time.Now()
if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), size, d); err != nil { if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), ocispec.Descriptor{Size: size, Digest: d}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -585,7 +586,7 @@ func checkCrossNSShare(ctx context.Context, t *testing.T, cs content.Store) {
} }
defer done(ctx2) defer done(ctx2)
w, err := cs.Writer(ctx2, ref, size, d) w, err := cs.Writer(ctx2, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -626,7 +627,7 @@ func checkCrossNSAppend(ctx context.Context, t *testing.T, cs content.Store) {
ref := fmt.Sprintf("ref-%d", size) ref := fmt.Sprintf("ref-%d", size)
t1 := time.Now() t1 := time.Now()
if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), size, d); err != nil { if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), ocispec.Descriptor{Size: size, Digest: d}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -643,7 +644,7 @@ func checkCrossNSAppend(ctx context.Context, t *testing.T, cs content.Store) {
copy(b2[size:], extra) copy(b2[size:], extra)
d2 := digest.FromBytes(b2) d2 := digest.FromBytes(b2)
w, err := cs.Writer(ctx2, ref, size, d) w, err := cs.Writer(ctx2, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -768,7 +769,7 @@ func checkContent(ctx context.Context, cs content.Store, d digest.Digest, expect
return err return err
} }
b, err := content.ReadBlob(ctx, cs, d) b, err := content.ReadBlob(ctx, cs, ocispec.Descriptor{Digest: d})
if err != nil { if err != nil {
return errors.Wrap(err, "failed to read blob") return errors.Wrap(err, "failed to read blob")
} }

View File

@ -73,7 +73,7 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [
var ocidesc ocispec.Descriptor var ocidesc ocispec.Descriptor
if err := mount.WithTempMount(ctx, mounts, func(root string) error { if err := mount.WithTempMount(ctx, mounts, func(root string) error {
ra, err := s.store.ReaderAt(ctx, desc.Digest) ra, err := s.store.ReaderAt(ctx, desc)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to get reader from content store") return errors.Wrap(err, "failed to get reader from content store")
} }

View File

@ -86,7 +86,11 @@ func (s *walkingDiff) Compare(ctx context.Context, lower, upper []mount.Mount, o
config.Reference = uniqueRef() config.Reference = uniqueRef()
} }
cw, err := s.store.Writer(ctx, config.Reference, 0, "") cw, err := s.store.Writer(ctx,
content.WithRef(config.Reference),
content.WithDescriptor(ocispec.Descriptor{
MediaType: config.MediaType, // most contentstore implementations just ignore this
}))
if err != nil { if err != nil {
return errors.Wrap(err, "failed to open writer") return errors.Wrap(err, "failed to open writer")
} }

View File

@ -105,7 +105,7 @@ func (s windowsDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts
return emptyDesc, errors.Wrapf(errdefs.ErrNotImplemented, "unsupported diff media type: %v", desc.MediaType) return emptyDesc, errors.Wrapf(errdefs.ErrNotImplemented, "unsupported diff media type: %v", desc.MediaType)
} }
ra, err := s.store.ReaderAt(ctx, desc.Digest) ra, err := s.store.ReaderAt(ctx, desc)
if err != nil { if err != nil {
return emptyDesc, errors.Wrap(err, "failed to get reader from content store") return emptyDesc, errors.Wrap(err, "failed to get reader from content store")
} }

View File

@ -143,7 +143,7 @@ func Manifest(ctx context.Context, provider content.Provider, image ocispec.Desc
if err := Walk(ctx, HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { if err := Walk(ctx, HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
switch desc.MediaType { switch desc.MediaType {
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
p, err := content.ReadBlob(ctx, provider, desc.Digest) p, err := content.ReadBlob(ctx, provider, desc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -159,7 +159,7 @@ func Manifest(ctx context.Context, provider content.Provider, image ocispec.Desc
} }
if desc.Platform == nil { if desc.Platform == nil {
p, err := content.ReadBlob(ctx, provider, manifest.Config.Digest) p, err := content.ReadBlob(ctx, provider, manifest.Config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -180,7 +180,7 @@ func Manifest(ctx context.Context, provider content.Provider, image ocispec.Desc
return nil, nil return nil, nil
case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
p, err := content.ReadBlob(ctx, provider, desc.Digest) p, err := content.ReadBlob(ctx, provider, desc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -240,7 +240,7 @@ func Platforms(ctx context.Context, provider content.Provider, image ocispec.Des
switch desc.MediaType { switch desc.MediaType {
case MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig: case MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig:
p, err := content.ReadBlob(ctx, provider, desc.Digest) p, err := content.ReadBlob(ctx, provider, desc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -283,7 +283,7 @@ func Check(ctx context.Context, provider content.Provider, image ocispec.Descrip
required = append([]ocispec.Descriptor{mfst.Config}, mfst.Layers...) required = append([]ocispec.Descriptor{mfst.Config}, mfst.Layers...)
for _, desc := range required { for _, desc := range required {
ra, err := provider.ReaderAt(ctx, desc.Digest) ra, err := provider.ReaderAt(ctx, desc)
if err != nil { if err != nil {
if errdefs.IsNotFound(err) { if errdefs.IsNotFound(err) {
missing = append(missing, desc) missing = append(missing, desc)
@ -305,7 +305,7 @@ func Children(ctx context.Context, provider content.Provider, desc ocispec.Descr
var descs []ocispec.Descriptor var descs []ocispec.Descriptor
switch desc.MediaType { switch desc.MediaType {
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
p, err := content.ReadBlob(ctx, provider, desc.Digest) p, err := content.ReadBlob(ctx, provider, desc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -320,7 +320,7 @@ func Children(ctx context.Context, provider content.Provider, desc ocispec.Descr
descs = append(descs, manifest.Config) descs = append(descs, manifest.Config)
descs = append(descs, manifest.Layers...) descs = append(descs, manifest.Layers...)
case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
p, err := content.ReadBlob(ctx, provider, desc.Digest) p, err := content.ReadBlob(ctx, provider, desc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -351,7 +351,7 @@ func Children(ctx context.Context, provider content.Provider, desc ocispec.Descr
// These are used to verify that a set of layers unpacked to the expected // These are used to verify that a set of layers unpacked to the expected
// values. // values.
func RootFS(ctx context.Context, provider content.Provider, configDesc ocispec.Descriptor) ([]digest.Digest, error) { func RootFS(ctx context.Context, provider content.Provider, configDesc ocispec.Descriptor) ([]digest.Digest, error) {
p, err := content.ReadBlob(ctx, provider, configDesc.Digest) p, err := content.ReadBlob(ctx, provider, configDesc)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -93,7 +93,7 @@ func blobRecord(cs content.Provider, desc ocispec.Descriptor) tarRecord {
Typeflag: tar.TypeReg, Typeflag: tar.TypeReg,
}, },
CopyTo: func(ctx context.Context, w io.Writer) (int64, error) { CopyTo: func(ctx context.Context, w io.Writer) (int64, error) {
r, err := cs.ReaderAt(ctx, desc.Digest) r, err := cs.ReaderAt(ctx, desc)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -140,7 +140,7 @@ func onUntarBlob(ctx context.Context, r io.Reader, store content.Ingester, name
return errors.Errorf("unsupported algorithm: %s", algo) return errors.Errorf("unsupported algorithm: %s", algo)
} }
dgst := digest.NewDigestFromHex(algo.String(), split[2]) dgst := digest.NewDigestFromHex(algo.String(), split[2])
return content.WriteBlob(ctx, store, "unknown-"+dgst.String(), r, size, dgst) return content.WriteBlob(ctx, store, "unknown-"+dgst.String(), r, ocispec.Descriptor{Size: size, Digest: dgst})
} }
// GetChildrenDescriptors returns children blob descriptors for the following supported types: // GetChildrenDescriptors returns children blob descriptors for the following supported types:
@ -175,7 +175,7 @@ func setGCRefContentLabels(ctx context.Context, store content.Store, desc ocispe
} }
return err return err
} }
ra, err := store.ReaderAt(ctx, desc.Digest) ra, err := store.ReaderAt(ctx, desc)
if err != nil { if err != nil {
return err return err
} }

View File

@ -32,6 +32,7 @@ import (
"github.com/containerd/containerd/metadata/boltutil" "github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
digest "github.com/opencontainers/go-digest" digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -337,7 +338,18 @@ func (cs *contentStore) Abort(ctx context.Context, ref string) error {
} }
func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { func (cs *contentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
var wOpts content.WriterOpts
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil, err
}
}
// TODO(AkihiroSuda): we could create a random string or one calculated based on the context
// https://github.com/containerd/containerd/issues/2129#issuecomment-380255019
if wOpts.Ref == "" {
return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
}
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -353,12 +365,12 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
) )
if err := update(ctx, cs.db, func(tx *bolt.Tx) error { if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
var shared bool var shared bool
if expected != "" { if wOpts.Desc.Digest != "" {
cbkt := getBlobBucket(tx, ns, expected) cbkt := getBlobBucket(tx, ns, wOpts.Desc.Digest)
if cbkt != nil { if cbkt != nil {
// Add content to lease to prevent other reference removals // Add content to lease to prevent other reference removals
// from effecting this object during a provided lease // from effecting this object during a provided lease
if err := addContentLease(ctx, tx, expected); err != nil { if err := addContentLease(ctx, tx, wOpts.Desc.Digest); err != nil {
return errors.Wrap(err, "unable to lease content") return errors.Wrap(err, "unable to lease content")
} }
// Return error outside of transaction to ensure // Return error outside of transaction to ensure
@ -367,18 +379,18 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
return nil return nil
} }
if st, err := cs.Store.Info(ctx, expected); err == nil { if st, err := cs.Store.Info(ctx, wOpts.Desc.Digest); err == nil {
// Ensure the expected size is the same, it is likely // Ensure the expected size is the same, it is likely
// an error if the size is mismatched but the caller // an error if the size is mismatched but the caller
// must resolve this on commit // must resolve this on commit
if size == 0 || size == st.Size { if wOpts.Desc.Size == 0 || wOpts.Desc.Size == st.Size {
shared = true shared = true
size = st.Size wOpts.Desc.Size = st.Size
} }
} }
} }
bkt, err := createIngestBucket(tx, ns, ref) bkt, err := createIngestBucket(tx, ns, wOpts.Ref)
if err != nil { if err != nil {
return err return err
} }
@ -390,7 +402,7 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
return err return err
} }
bref = createKey(sid, ns, ref) bref = createKey(sid, ns, wOpts.Ref)
if err := bkt.Put(bucketKeyRef, []byte(bref)); err != nil { if err := bkt.Put(bucketKeyRef, []byte(bref)); err != nil {
return err return err
} }
@ -399,7 +411,7 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
} }
if shared { if shared {
if err := bkt.Put(bucketKeyExpected, []byte(expected)); err != nil { if err := bkt.Put(bucketKeyExpected, []byte(wOpts.Desc.Digest)); err != nil {
return err return err
} }
} else { } else {
@ -407,19 +419,21 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
// already checked against the user metadata. The content must // already checked against the user metadata. The content must
// be committed in the namespace before it will be seen as // be committed in the namespace before it will be seen as
// available in the current namespace. // available in the current namespace.
w, err = cs.Store.Writer(ctx, bref, size, "") desc := wOpts.Desc
desc.Digest = ""
w, err = cs.Store.Writer(ctx, content.WithRef(bref), content.WithDescriptor(desc))
} }
return err return err
}); err != nil { }); err != nil {
return nil, err return nil, err
} }
if exists { if exists {
return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected) return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", wOpts.Desc.Digest)
} }
return &namespacedWriter{ return &namespacedWriter{
ctx: ctx, ctx: ctx,
ref: ref, ref: wOpts.Ref,
namespace: ns, namespace: ns,
db: cs.db, db: cs.db,
provider: cs.Store, provider: cs.Store,
@ -427,8 +441,7 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
w: w, w: w,
bref: bref, bref: bref,
started: time.Now(), started: time.Now(),
expected: expected, desc: wOpts.Desc,
size: size,
}, nil }, nil
} }
@ -445,10 +458,9 @@ type namespacedWriter struct {
w content.Writer w content.Writer
bref string bref string
started time.Time started time.Time
expected digest.Digest desc ocispec.Descriptor
size int64
} }
func (nw *namespacedWriter) Close() error { func (nw *namespacedWriter) Close() error {
@ -465,7 +477,7 @@ func (nw *namespacedWriter) Write(p []byte) (int, error) {
return 0, nil return 0, nil
} }
if err := nw.createAndCopy(nw.ctx, nw.size); err != nil { if err := nw.createAndCopy(nw.ctx, nw.desc); err != nil {
return 0, err return 0, err
} }
} }
@ -477,31 +489,35 @@ func (nw *namespacedWriter) Digest() digest.Digest {
if nw.w != nil { if nw.w != nil {
return nw.w.Digest() return nw.w.Digest()
} }
return nw.expected return nw.desc.Digest
} }
func (nw *namespacedWriter) Truncate(size int64) error { func (nw *namespacedWriter) Truncate(size int64) error {
if nw.w != nil { if nw.w != nil {
return nw.w.Truncate(size) return nw.w.Truncate(size)
} }
desc := nw.desc
return nw.createAndCopy(nw.ctx, size) desc.Size = size
desc.Digest = ""
return nw.createAndCopy(nw.ctx, desc)
} }
func (nw *namespacedWriter) createAndCopy(ctx context.Context, size int64) error { func (nw *namespacedWriter) createAndCopy(ctx context.Context, desc ocispec.Descriptor) error {
w, err := nw.provider.Writer(ctx, nw.bref, nw.size, "") nwDescWithoutDigest := desc
nwDescWithoutDigest.Digest = ""
w, err := nw.provider.Writer(ctx, content.WithRef(nw.bref), content.WithDescriptor(nwDescWithoutDigest))
if err != nil { if err != nil {
return err return err
} }
if size > 0 { if desc.Size > 0 {
ra, err := nw.provider.ReaderAt(ctx, nw.expected) ra, err := nw.provider.ReaderAt(ctx, nw.desc)
if err != nil { if err != nil {
return err return err
} }
defer ra.Close() defer ra.Close()
if err := content.CopyReaderAt(w, ra, size); err != nil { if err := content.CopyReaderAt(w, ra, desc.Size); err != nil {
nw.w.Close() nw.w.Close()
nw.w = nil nw.w = nil
return err return err
@ -544,14 +560,14 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
var actual digest.Digest var actual digest.Digest
if nw.w == nil { if nw.w == nil {
if size != 0 && size != nw.size { if size != 0 && size != nw.desc.Size {
return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, nw.size, size) return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, nw.desc.Size, size)
} }
if expected != "" && expected != nw.expected { if expected != "" && expected != nw.desc.Digest {
return "", errors.Errorf("%q unexpected digest", nw.ref) return "", errors.Errorf("%q unexpected digest", nw.ref)
} }
size = nw.size size = nw.desc.Size
actual = nw.expected actual = nw.desc.Digest
if getBlobBucket(tx, nw.namespace, actual) != nil { if getBlobBucket(tx, nw.namespace, actual) != nil {
return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
} }
@ -601,11 +617,11 @@ func (nw *namespacedWriter) Status() (st content.Status, err error) {
if nw.w != nil { if nw.w != nil {
st, err = nw.w.Status() st, err = nw.w.Status()
} else { } else {
st.Offset = nw.size st.Offset = nw.desc.Size
st.Total = nw.size st.Total = nw.desc.Size
st.StartedAt = nw.started st.StartedAt = nw.started
st.UpdatedAt = nw.started st.UpdatedAt = nw.started
st.Expected = nw.expected st.Expected = nw.desc.Digest
} }
if err == nil { if err == nil {
st.Ref = nw.ref st.Ref = nw.ref
@ -613,11 +629,11 @@ func (nw *namespacedWriter) Status() (st content.Status, err error) {
return return
} }
func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { func (cs *contentStore) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
if err := cs.checkAccess(ctx, dgst); err != nil { if err := cs.checkAccess(ctx, desc.Digest); err != nil {
return nil, err return nil, err
} }
return cs.Store.ReaderAt(ctx, dgst) return cs.Store.ReaderAt(ctx, desc)
} }
func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) error { func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) error {

View File

@ -32,6 +32,7 @@ import (
"github.com/containerd/containerd/leases" "github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
digest "github.com/opencontainers/go-digest" digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -81,7 +82,8 @@ func TestContentLeased(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := content.WriteBlob(lctx, cs, "test-1", bytes.NewReader(blob), int64(len(blob)), expected); err != nil { if err := content.WriteBlob(lctx, cs, "test-1", bytes.NewReader(blob),
ocispec.Descriptor{Size: int64(len(blob)), Digest: expected}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := checkContentLeased(lctx, db, expected); err != nil { if err := checkContentLeased(lctx, db, expected); err != nil {
@ -93,7 +95,9 @@ func TestContentLeased(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if _, err := cs.Writer(lctx, "test-2", int64(len(blob)), expected); err == nil { if _, err := cs.Writer(lctx,
content.WithRef("test-2"),
content.WithDescriptor(ocispec.Descriptor{Size: int64(len(blob)), Digest: expected})); err == nil {
t.Fatal("expected already exist error") t.Fatal("expected already exist error")
} else if !errdefs.IsAlreadyExists(err) { } else if !errdefs.IsAlreadyExists(err) {
t.Fatal(err) t.Fatal(err)

View File

@ -506,7 +506,9 @@ func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snaps
case testContent: case testContent:
ctx := WithTransactionContext(ctx, tx) ctx := WithTransactionContext(ctx, tx)
expected := digest.FromBytes(v.data) expected := digest.FromBytes(v.data)
w, err := cs.Writer(ctx, "test-ref", int64(len(v.data)), expected) w, err := cs.Writer(ctx,
content.WithRef("test-ref"),
content.WithDescriptor(ocispec.Descriptor{Size: int64(len(v.data)), Digest: expected}))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to create writer") return nil, errors.Wrap(err, "failed to create writer")
} }

View File

@ -117,7 +117,7 @@ func WithImageConfig(image Image) SpecOpts {
) )
switch ic.MediaType { switch ic.MediaType {
case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config: case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config:
p, err := content.ReadBlob(ctx, image.ContentStore(), ic.Digest) p, err := content.ReadBlob(ctx, image.ContentStore(), ic)
if err != nil { if err != nil {
return err return err
} }

View File

@ -44,7 +44,7 @@ func WithImageConfig(image Image) SpecOpts {
) )
switch ic.MediaType { switch ic.MediaType {
case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config: case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config:
p, err := content.ReadBlob(ctx, image.ContentStore(), ic.Digest) p, err := content.ReadBlob(ctx, image.ContentStore(), ic)
if err != nil { if err != nil {
return err return err
} }

View File

@ -211,12 +211,12 @@ func (c *Converter) Convert(ctx context.Context, opts ...ConvertOpt) (ocispec.De
} }
ref := remotes.MakeRefKey(ctx, desc) ref := remotes.MakeRefKey(ctx, desc)
if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(mb), desc.Size, desc.Digest, content.WithLabels(labels)); err != nil { if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(mb), desc, content.WithLabels(labels)); err != nil {
return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config") return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config")
} }
ref = remotes.MakeRefKey(ctx, config) ref = remotes.MakeRefKey(ctx, config)
if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(b), config.Size, config.Digest); err != nil { if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(b), config); err != nil {
return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config") return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config")
} }
@ -265,7 +265,7 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro
size = 0 size = 0
} }
cw, err := content.OpenWriter(ctx, c.contentStore, ref, size, desc.Digest) cw, err := content.OpenWriter(ctx, c.contentStore, content.WithRef(ref), content.WithDescriptor(desc))
if err != nil { if err != nil {
if !errdefs.IsAlreadyExists(err) { if !errdefs.IsAlreadyExists(err) {
return err return err
@ -274,7 +274,7 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro
// TODO: Check if blob -> diff id mapping already exists // TODO: Check if blob -> diff id mapping already exists
// TODO: Check if blob empty label exists // TODO: Check if blob empty label exists
ra, err := c.contentStore.ReaderAt(ctx, desc.Digest) ra, err := c.contentStore.ReaderAt(ctx, desc)
if err != nil { if err != nil {
return err return err
} }

View File

@ -81,7 +81,7 @@ func FetchHandler(ingester content.Ingester, fetcher Fetcher) images.HandlerFunc
func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error { func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error {
log.G(ctx).Debug("fetch") log.G(ctx).Debug("fetch")
cw, err := content.OpenWriter(ctx, ingester, MakeRefKey(ctx, desc), desc.Size, desc.Digest) cw, err := content.OpenWriter(ctx, ingester, content.WithRef(MakeRefKey(ctx, desc)), content.WithDescriptor(desc))
if err != nil { if err != nil {
if errdefs.IsAlreadyExists(err) { if errdefs.IsAlreadyExists(err) {
return nil return nil
@ -141,7 +141,7 @@ func push(ctx context.Context, provider content.Provider, pusher Pusher, desc oc
} }
defer cw.Close() defer cw.Close()
ra, err := provider.ReaderAt(ctx, desc.Digest) ra, err := provider.ReaderAt(ctx, desc)
if err != nil { if err != nil {
return err return err
} }

View File

@ -29,6 +29,7 @@ import (
"github.com/containerd/containerd/services" "github.com/containerd/containerd/services"
ptypes "github.com/gogo/protobuf/types" ptypes "github.com/gogo/protobuf/types"
digest "github.com/opencontainers/go-digest" digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -179,7 +180,7 @@ func (s *service) Read(req *api.ReadContentRequest, session api.Content_ReadServ
return errdefs.ToGRPC(err) return errdefs.ToGRPC(err)
} }
ra, err := s.store.ReaderAt(session.Context(), req.Digest) ra, err := s.store.ReaderAt(session.Context(), ocispec.Descriptor{Digest: req.Digest})
if err != nil { if err != nil {
return errdefs.ToGRPC(err) return errdefs.ToGRPC(err)
} }
@ -334,7 +335,9 @@ func (s *service) Write(session api.Content_WriteServer) (err error) {
log.G(ctx).Debug("(*service).Write started") log.G(ctx).Debug("(*service).Write started")
// this action locks the writer for the session. // this action locks the writer for the session.
wr, err := s.store.Writer(ctx, ref, total, expected) wr, err := s.store.Writer(ctx,
content.WithRef(ref),
content.WithDescriptor(ocispec.Descriptor{Size: total, Digest: expected}))
if err != nil { if err != nil {
return errdefs.ToGRPC(err) return errdefs.ToGRPC(err)
} }

View File

@ -45,6 +45,7 @@ import (
"github.com/containerd/containerd/services" "github.com/containerd/containerd/services"
"github.com/containerd/typeurl" "github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types" ptypes "github.com/gogo/protobuf/types"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -121,7 +122,11 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.
if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint { if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint {
return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType) return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType)
} }
reader, err := l.store.ReaderAt(ctx, r.Checkpoint.Digest) reader, err := l.store.ReaderAt(ctx, ocispec.Descriptor{
MediaType: r.Checkpoint.MediaType,
Digest: r.Checkpoint.Digest,
Size: r.Checkpoint.Size_,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -573,7 +578,7 @@ func getTasksMetrics(ctx context.Context, filter filters.Filter, tasks []runtime
} }
func (l *local) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) { func (l *local) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
writer, err := l.store.Writer(ctx, ref, 0, "") writer, err := l.store.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{MediaType: mediaType}))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -597,7 +597,7 @@ func (t *task) writeIndex(ctx context.Context, index *v1.Index) (d v1.Descriptor
} }
func writeContent(ctx context.Context, store content.Ingester, mediaType, ref string, r io.Reader, opts ...content.Opt) (d v1.Descriptor, err error) { func writeContent(ctx context.Context, store content.Ingester, mediaType, ref string, r io.Reader, opts ...content.Opt) (d v1.Descriptor, err error) {
writer, err := store.Writer(ctx, ref, 0, "") writer, err := store.Writer(ctx, content.WithRef(ref))
if err != nil { if err != nil {
return d, err return d, err
} }

View File

@ -43,8 +43,8 @@ github.com/syndtr/gocapability db04d3cc01c8b54962a58ec7e491717d06cfcc16
github.com/gotestyourself/gotestyourself 44dbf532bbf5767611f6f2a61bded572e337010a github.com/gotestyourself/gotestyourself 44dbf532bbf5767611f6f2a61bded572e337010a
github.com/google/go-cmp v0.1.0 github.com/google/go-cmp v0.1.0
# cri dependencies # #2135: cri is temporarily forked because of circular dependency. will be fixed immediately in a follow-up PR.
github.com/containerd/cri b68fb075d49aa1c2885f45f2467142666c244f4a github.com/containerd/cri 6e975823be192ad19f2ce7afcf6c57b88a991c30 https://github.com/AkihiroSuda/cri-containerd.git
github.com/containerd/go-cni f2d7272f12d045b16ed924f50e91f9f9cecc55a7 github.com/containerd/go-cni f2d7272f12d045b16ed924f50e91f9f9cecc55a7
github.com/blang/semver v3.1.0 github.com/blang/semver v3.1.0
github.com/containernetworking/cni v0.6.0 github.com/containernetworking/cni v0.6.0

View File

@ -254,23 +254,24 @@ func writeDockerSchema2Manifest(ctx context.Context, cs content.Ingester, manife
for i, ch := range manifest.Layers { for i, ch := range manifest.Layers {
labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = ch.Digest.String() labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = ch.Digest.String()
} }
if err := content.WriteBlob(ctx, cs, "manifest-"+manifestDigest.String(), manifestBytesR,
int64(len(manifestBytes)), manifestDigest, content.WithLabels(labels)); err != nil {
return nil, err
}
desc := &ocispec.Descriptor{ desc := ocispec.Descriptor{
MediaType: images.MediaTypeDockerSchema2Manifest, MediaType: images.MediaTypeDockerSchema2Manifest,
Digest: manifestDigest, Digest: manifestDigest,
Size: int64(len(manifestBytes)), Size: int64(len(manifestBytes)),
} }
if err := content.WriteBlob(ctx, cs, "manifest-"+manifestDigest.String(), manifestBytesR,
desc, content.WithLabels(labels)); err != nil {
return nil, err
}
if arch != "" || os != "" { if arch != "" || os != "" {
desc.Platform = &ocispec.Platform{ desc.Platform = &ocispec.Platform{
Architecture: arch, Architecture: arch,
OS: os, OS: os,
} }
} }
return desc, nil return &desc, nil
} }
func onUntarManifestJSON(r io.Reader) ([]manifestDotJSON, error) { func onUntarManifestJSON(r io.Reader) ([]manifestDotJSON, error) {
@ -290,7 +291,7 @@ func onUntarLayerTar(ctx context.Context, r io.Reader, cs content.Ingester, name
// name is like "foobar/layer.tar" ( guaranteed by isLayerTar() ) // name is like "foobar/layer.tar" ( guaranteed by isLayerTar() )
split := strings.Split(name, "/") split := strings.Split(name, "/")
// note: split[0] is not expected digest here // note: split[0] is not expected digest here
cw, err := cs.Writer(ctx, "layer-"+split[0], size, "") cw, err := cs.Writer(ctx, content.WithRef("layer-"+split[0]), content.WithDescriptor(ocispec.Descriptor{Size: size}))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -311,7 +312,7 @@ func onUntarDotJSON(ctx context.Context, r io.Reader, cs content.Ingester, name
config.desc.Size = size config.desc.Size = size
// name is like "foobar.json" ( guaranteed by is DotJSON() ) // name is like "foobar.json" ( guaranteed by is DotJSON() )
split := strings.Split(name, ".") split := strings.Split(name, ".")
cw, err := cs.Writer(ctx, "config-"+split[0], size, "") cw, err := cs.Writer(ctx, content.WithRef("config-"+split[0]), content.WithDescriptor(ocispec.Descriptor{Size: size}))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -116,15 +116,16 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
} }
// start starts the event monitor which monitors and handles all container events. It returns // start starts the event monitor which monitors and handles all container events. It returns
// a channel for the caller to wait for the event monitor to stop. start must be called after // an error channel for the caller to wait for stop errors from the event monitor.
// subscribe. // start must be called after subscribe.
func (em *eventMonitor) start() (<-chan struct{}, error) { func (em *eventMonitor) start() <-chan error {
errCh := make(chan error)
if em.ch == nil || em.errCh == nil { if em.ch == nil || em.errCh == nil {
return nil, errors.New("event channel is nil") panic("event channel is nil")
} }
closeCh := make(chan struct{})
backOffCheckCh := em.backOff.start() backOffCheckCh := em.backOff.start()
go func() { go func() {
defer close(errCh)
for { for {
select { select {
case e := <-em.ch: case e := <-em.ch:
@ -144,8 +145,11 @@ func (em *eventMonitor) start() (<-chan struct{}, error) {
em.backOff.enBackOff(cID, evt) em.backOff.enBackOff(cID, evt)
} }
case err := <-em.errCh: case err := <-em.errCh:
logrus.WithError(err).Error("Failed to handle event stream") // Close errCh in defer directly if there is no error.
close(closeCh) if err != nil {
logrus.WithError(err).Errorf("Failed to handle event stream")
errCh <- err
}
return return
case <-backOffCheckCh: case <-backOffCheckCh:
cIDs := em.backOff.getExpiredContainers() cIDs := em.backOff.getExpiredContainers()
@ -162,7 +166,7 @@ func (em *eventMonitor) start() (<-chan struct{}, error) {
} }
} }
}() }()
return closeCh, nil return errCh
} }
// stop stops the event monitor. It will close the event channel. // stop stops the event monitor. It will close the event channel.

View File

@ -345,7 +345,7 @@ func getImageInfo(ctx context.Context, image containerd.Image) (*imageInfo, erro
} }
id := desc.Digest.String() id := desc.Digest.String()
rb, err := content.ReadBlob(ctx, image.ContentStore(), desc.Digest) rb, err := content.ReadBlob(ctx, image.ContentStore(), desc)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to read image config from content store") return nil, errors.Wrap(err, "failed to read image config from content store")
} }

View File

@ -19,6 +19,7 @@ package server
import ( import (
"fmt" "fmt"
"io" "io"
"net/http"
"path/filepath" "path/filepath"
"time" "time"
@ -179,10 +180,7 @@ func (c *criService) Run() error {
// Start event handler. // Start event handler.
logrus.Info("Start event monitor") logrus.Info("Start event monitor")
eventMonitorCloseCh, err := c.eventMonitor.start() eventMonitorErrCh := c.eventMonitor.start()
if err != nil {
return errors.Wrap(err, "failed to start event monitor")
}
// Start snapshot stats syncer, it doesn't need to be stopped. // Start snapshot stats syncer, it doesn't need to be stopped.
logrus.Info("Start snapshots syncer") logrus.Info("Start snapshots syncer")
@ -195,27 +193,32 @@ func (c *criService) Run() error {
// Start streaming server. // Start streaming server.
logrus.Info("Start streaming server") logrus.Info("Start streaming server")
streamServerCloseCh := make(chan struct{}) streamServerErrCh := make(chan error)
go func() { go func() {
if err := c.streamServer.Start(true); err != nil { defer close(streamServerErrCh)
if err := c.streamServer.Start(true); err != nil && err != http.ErrServerClosed {
logrus.WithError(err).Error("Failed to start streaming server") logrus.WithError(err).Error("Failed to start streaming server")
streamServerErrCh <- err
} }
close(streamServerCloseCh)
}() }()
// Set the server as initialized. GRPC services could start serving traffic. // Set the server as initialized. GRPC services could start serving traffic.
c.initialized.Set() c.initialized.Set()
var eventMonitorErr, streamServerErr error
// Stop the whole CRI service if any of the critical service exits. // Stop the whole CRI service if any of the critical service exits.
select { select {
case <-eventMonitorCloseCh: case eventMonitorErr = <-eventMonitorErrCh:
case <-streamServerCloseCh: case streamServerErr = <-streamServerErrCh:
} }
if err := c.Close(); err != nil { if err := c.Close(); err != nil {
return errors.Wrap(err, "failed to stop cri service") return errors.Wrap(err, "failed to stop cri service")
} }
// If the error is set above, err from channel must be nil here, because
<-eventMonitorCloseCh // the channel is supposed to be closed. Or else, we wait and set it.
if err := <-eventMonitorErrCh; err != nil {
eventMonitorErr = err
}
logrus.Info("Event monitor stopped") logrus.Info("Event monitor stopped")
// There is a race condition with http.Server.Serve. // There is a race condition with http.Server.Serve.
// When `Close` is called at the same time with `Serve`, `Close` // When `Close` is called at the same time with `Serve`, `Close`
@ -227,18 +230,27 @@ func (c *criService) Run() error {
// is fixed. // is fixed.
const streamServerStopTimeout = 2 * time.Second const streamServerStopTimeout = 2 * time.Second
select { select {
case <-streamServerCloseCh: case err := <-streamServerErrCh:
if err != nil {
streamServerErr = err
}
logrus.Info("Stream server stopped") logrus.Info("Stream server stopped")
case <-time.After(streamServerStopTimeout): case <-time.After(streamServerStopTimeout):
logrus.Errorf("Stream server is not stopped in %q", streamServerStopTimeout) logrus.Errorf("Stream server is not stopped in %q", streamServerStopTimeout)
} }
if eventMonitorErr != nil {
return errors.Wrap(eventMonitorErr, "event monitor error")
}
if streamServerErr != nil {
return errors.Wrap(streamServerErr, "stream server error")
}
return nil return nil
} }
// Stop stops the CRI service. // Close stops the CRI service.
// TODO(random-liu): Make close synchronous.
func (c *criService) Close() error { func (c *criService) Close() error {
logrus.Info("Stop CRI service") logrus.Info("Stop CRI service")
// TODO(random-liu): Make event monitor stop synchronous.
c.eventMonitor.stop() c.eventMonitor.stop()
if err := c.streamServer.Stop(); err != nil { if err := c.streamServer.Stop(); err != nil {
return errors.Wrap(err, "failed to stop stream server") return errors.Wrap(err, "failed to stop stream server")

View File

@ -4,7 +4,7 @@ github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd
github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895 github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895
github.com/containerd/cgroups fe281dd265766145e943a034aa41086474ea6130 github.com/containerd/cgroups fe281dd265766145e943a034aa41086474ea6130
github.com/containerd/console cb7008ab3d8359b78c5f464cb7cf160107ad5925 github.com/containerd/console cb7008ab3d8359b78c5f464cb7cf160107ad5925
github.com/containerd/containerd 1e8b09cfc6825f7e6349884b5f76e86c1f04a5d4 github.com/containerd/containerd 1f8c612a6c7ef2fc8328c953fb660adce2bf0a80 https://github.com/AkihiroSuda/containerd.git
github.com/containerd/continuity 2d3749b4da569ac97ca63dccba5eee4f5ee2beab github.com/containerd/continuity 2d3749b4da569ac97ca63dccba5eee4f5ee2beab
github.com/containerd/fifo 3d5202aec260678c48179c56f40e6f38a095738c github.com/containerd/fifo 3d5202aec260678c48179c56f40e6f38a095738c
github.com/containerd/go-cni f2d7272f12d045b16ed924f50e91f9f9cecc55a7 github.com/containerd/go-cni f2d7272f12d045b16ed924f50e91f9f9cecc55a7