Merge pull request #1228 from dmcgowan/content-testsuite
content: test suite
This commit is contained in:
commit
b7e0101b37
@ -3,20 +3,11 @@ package content
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
||||
var (
|
||||
bufPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, 1<<20)
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
type Provider interface {
|
||||
Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error)
|
||||
ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error)
|
||||
|
@ -5,12 +5,21 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
bufPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, 1<<20)
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
// ReadBlob retrieves the entire contents of the blob from the provider.
|
||||
//
|
||||
// Avoid using this for large blobs, such as layers.
|
||||
@ -121,8 +130,3 @@ func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
|
||||
|
||||
return r, errors.Wrapf(errUnseekable, "seek to offset %v failed", offset)
|
||||
}
|
||||
|
||||
func readFileString(path string) (string, error) {
|
||||
p, err := ioutil.ReadFile(path)
|
||||
return string(p), err
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
package content
|
||||
package local
|
||||
|
||||
import (
|
||||
"sync"
|
@ -1,4 +1,4 @@
|
||||
package content
|
||||
package local
|
||||
|
||||
import (
|
||||
"io"
|
@ -1,4 +1,4 @@
|
||||
package content
|
||||
package local
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -8,8 +8,10 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/filters"
|
||||
"github.com/containerd/containerd/log"
|
||||
@ -17,6 +19,14 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
bufPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, 1<<20)
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
// Store is digest-keyed store for content. All data written into the store is
|
||||
// stored under a verifiable digest.
|
||||
//
|
||||
@ -26,7 +36,7 @@ type store struct {
|
||||
root string
|
||||
}
|
||||
|
||||
func NewStore(root string) (Store, error) {
|
||||
func NewStore(root string) (content.Store, error) {
|
||||
if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
@ -36,7 +46,7 @@ func NewStore(root string) (Store, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *store) Info(ctx context.Context, dgst digest.Digest) (Info, error) {
|
||||
func (s *store) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
|
||||
p := s.blobPath(dgst)
|
||||
fi, err := os.Stat(p)
|
||||
if err != nil {
|
||||
@ -44,14 +54,14 @@ func (s *store) Info(ctx context.Context, dgst digest.Digest) (Info, error) {
|
||||
err = errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
|
||||
}
|
||||
|
||||
return Info{}, err
|
||||
return content.Info{}, err
|
||||
}
|
||||
|
||||
return s.info(dgst, fi), nil
|
||||
}
|
||||
|
||||
func (s *store) info(dgst digest.Digest, fi os.FileInfo) Info {
|
||||
return Info{
|
||||
func (s *store) info(dgst digest.Digest, fi os.FileInfo) content.Info {
|
||||
return content.Info{
|
||||
Digest: dgst,
|
||||
Size: fi.Size(),
|
||||
CreatedAt: fi.ModTime(),
|
||||
@ -93,12 +103,12 @@ func (cs *store) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *store) Update(ctx context.Context, info Info, fieldpaths ...string) (Info, error) {
|
||||
func (cs *store) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
|
||||
// TODO: Support persisting and updating mutable content data
|
||||
return Info{}, errors.Wrapf(errdefs.ErrFailedPrecondition, "update not supported on immutable content store")
|
||||
return content.Info{}, errors.Wrapf(errdefs.ErrFailedPrecondition, "update not supported on immutable content store")
|
||||
}
|
||||
|
||||
func (cs *store) Walk(ctx context.Context, fn WalkFunc, filters ...string) error {
|
||||
func (cs *store) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
|
||||
// TODO: Support filters
|
||||
root := filepath.Join(cs.root, "blobs")
|
||||
var alg digest.Algorithm
|
||||
@ -141,11 +151,11 @@ func (cs *store) Walk(ctx context.Context, fn WalkFunc, filters ...string) error
|
||||
})
|
||||
}
|
||||
|
||||
func (s *store) Status(ctx context.Context, ref string) (Status, error) {
|
||||
func (s *store) Status(ctx context.Context, ref string) (content.Status, error) {
|
||||
return s.status(s.ingestRoot(ref))
|
||||
}
|
||||
|
||||
func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]Status, error) {
|
||||
func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) {
|
||||
fp, err := os.Open(filepath.Join(s.root, "ingest"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -163,7 +173,7 @@ func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]Status, error
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var active []Status
|
||||
var active []content.Status
|
||||
for _, fi := range fis {
|
||||
p := filepath.Join(s.root, "ingest", fi.Name())
|
||||
stat, err := s.status(p)
|
||||
@ -192,19 +202,19 @@ func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]Status, error
|
||||
}
|
||||
|
||||
// status works like stat above except uses the path to the ingest.
|
||||
func (s *store) status(ingestPath string) (Status, error) {
|
||||
func (s *store) status(ingestPath string) (content.Status, error) {
|
||||
dp := filepath.Join(ingestPath, "data")
|
||||
fi, err := os.Stat(dp)
|
||||
if err != nil {
|
||||
return Status{}, err
|
||||
return content.Status{}, err
|
||||
}
|
||||
|
||||
ref, err := readFileString(filepath.Join(ingestPath, "ref"))
|
||||
if err != nil {
|
||||
return Status{}, err
|
||||
return content.Status{}, err
|
||||
}
|
||||
|
||||
return Status{
|
||||
return content.Status{
|
||||
Ref: ref,
|
||||
Offset: fi.Size(),
|
||||
Total: s.total(ingestPath),
|
||||
@ -213,7 +223,7 @@ func (s *store) status(ingestPath string) (Status, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func adaptStatus(status Status) filters.Adaptor {
|
||||
func adaptStatus(status content.Status) filters.Adaptor {
|
||||
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
|
||||
if len(fieldpath) == 0 {
|
||||
return "", false
|
||||
@ -248,7 +258,7 @@ func (s *store) total(ingestPath string) int64 {
|
||||
// ref at a time.
|
||||
//
|
||||
// The argument `ref` is used to uniquely identify a long-lived writer transaction.
|
||||
func (s *store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (Writer, error) {
|
||||
func (s *store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {
|
||||
// TODO(stevvooe): Need to actually store expected here. We have
|
||||
// code in the service that shouldn't be dealing with this.
|
||||
if expected != "" {
|
||||
@ -384,3 +394,8 @@ func (s *store) ingestPaths(ref string) (string, string, string) {
|
||||
|
||||
return fp, rp, dp
|
||||
}
|
||||
|
||||
func readFileString(path string) (string, error) {
|
||||
p, err := ioutil.ReadFile(path)
|
||||
return string(p), err
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package content
|
||||
package local
|
||||
|
||||
import (
|
||||
"os"
|
@ -1,4 +1,4 @@
|
||||
package content
|
||||
package local
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
@ -17,10 +17,22 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/content/testsuite"
|
||||
"github.com/containerd/containerd/testutil"
|
||||
"github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
||||
func TestContent(t *testing.T) {
|
||||
testsuite.ContentSuite(t, "fs", func(ctx context.Context, root string) (content.Store, func(), error) {
|
||||
cs, err := NewStore(root)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return cs, func() {}, nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestContentWriter(t *testing.T) {
|
||||
ctx, tmpdir, cs, cleanup := contentStoreEnv(t)
|
||||
defer cleanup()
|
||||
@ -63,7 +75,7 @@ func TestContentWriter(t *testing.T) {
|
||||
ingestions[i].StartedAt = time.Time{}
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(ingestions, []Status{
|
||||
if !reflect.DeepEqual(ingestions, []content.Status{
|
||||
{
|
||||
Ref: "myref",
|
||||
Offset: 0,
|
||||
@ -132,7 +144,7 @@ func TestWalkBlobs(t *testing.T) {
|
||||
expected[dgst] = struct{}{}
|
||||
}
|
||||
|
||||
if err := cs.Walk(ctx, func(bi Info) error {
|
||||
if err := cs.Walk(ctx, func(bi content.Info) error {
|
||||
found[bi.Digest] = struct{}{}
|
||||
checkBlobPath(t, cs, bi.Digest)
|
||||
return nil
|
||||
@ -201,7 +213,7 @@ func generateBlobs(t checker, nblobs, maxsize int64) map[digest.Digest][]byte {
|
||||
return blobs
|
||||
}
|
||||
|
||||
func populateBlobStore(t checker, ctx context.Context, cs Store, nblobs, maxsize int64) map[digest.Digest][]byte {
|
||||
func populateBlobStore(t checker, ctx context.Context, cs content.Store, nblobs, maxsize int64) map[digest.Digest][]byte {
|
||||
blobs := generateBlobs(t, nblobs, maxsize)
|
||||
|
||||
for dgst, p := range blobs {
|
||||
@ -211,7 +223,7 @@ func populateBlobStore(t checker, ctx context.Context, cs Store, nblobs, maxsize
|
||||
return blobs
|
||||
}
|
||||
|
||||
func contentStoreEnv(t checker) (context.Context, string, Store, func()) {
|
||||
func contentStoreEnv(t checker) (context.Context, string, content.Store, func()) {
|
||||
pc, _, _, ok := runtime.Caller(1)
|
||||
if !ok {
|
||||
t.Fatal("failed to resolve caller")
|
||||
@ -247,7 +259,7 @@ func checkCopy(t checker, size int64, dst io.Writer, src io.Reader) {
|
||||
}
|
||||
}
|
||||
|
||||
func checkBlobPath(t *testing.T, cs Store, dgst digest.Digest) string {
|
||||
func checkBlobPath(t *testing.T, cs content.Store, dgst digest.Digest) string {
|
||||
path := cs.(*store).blobPath(dgst)
|
||||
|
||||
if path != filepath.Join(cs.(*store).root, "blobs", dgst.Algorithm().String(), dgst.Hex()) {
|
||||
@ -268,8 +280,8 @@ func checkBlobPath(t *testing.T, cs Store, dgst digest.Digest) string {
|
||||
return path
|
||||
}
|
||||
|
||||
func checkWrite(t checker, ctx context.Context, cs Store, dgst digest.Digest, p []byte) digest.Digest {
|
||||
if err := WriteBlob(ctx, cs, dgst.String(), bytes.NewReader(p), int64(len(p)), dgst); err != nil {
|
||||
func checkWrite(t checker, ctx context.Context, cs content.Store, dgst digest.Digest, p []byte) digest.Digest {
|
||||
if err := content.WriteBlob(ctx, cs, dgst.String(), bytes.NewReader(p), int64(len(p)), dgst); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
// +build darwin freebsd
|
||||
|
||||
package content
|
||||
package local
|
||||
|
||||
import (
|
||||
"os"
|
@ -1,4 +1,4 @@
|
||||
package content
|
||||
package local
|
||||
|
||||
import (
|
||||
"os"
|
@ -1,4 +1,4 @@
|
||||
package content
|
||||
package local
|
||||
|
||||
import (
|
||||
"os"
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
@ -24,8 +25,8 @@ type writer struct {
|
||||
updatedAt time.Time
|
||||
}
|
||||
|
||||
func (w *writer) Status() (Status, error) {
|
||||
return Status{
|
||||
func (w *writer) Status() (content.Status, error) {
|
||||
return content.Status{
|
||||
Ref: w.ref,
|
||||
Offset: w.offset,
|
||||
Total: w.total,
|
||||
@ -109,6 +110,10 @@ func (w *writer) Commit(size int64, expected digest.Digest) error {
|
||||
}
|
||||
return err
|
||||
}
|
||||
commitTime := time.Now()
|
||||
if err := os.Chtimes(target, commitTime, commitTime); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
unlock(w.ref)
|
||||
w.fp = nil
|
282
content/testsuite/testsuite.go
Normal file
282
content/testsuite/testsuite.go
Normal file
@ -0,0 +1,282 @@
|
||||
package testsuite
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/testutil"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// ContentSuite runs a test suite on the snapshotter given a factory function.
|
||||
func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, root string) (content.Store, func(), error)) {
|
||||
t.Run("Writer", makeTest(t, name, storeFn, checkContentStoreWriter))
|
||||
t.Run("UploadStatus", makeTest(t, name, storeFn, checkUploadStatus))
|
||||
}
|
||||
|
||||
func makeTest(t *testing.T, name string, storeFn func(ctx context.Context, root string) (content.Store, func(), error), fn func(ctx context.Context, t *testing.T, cs content.Store)) func(t *testing.T) {
|
||||
return func(t *testing.T) {
|
||||
ctx := namespaces.WithNamespace(context.Background(), name)
|
||||
|
||||
tmpDir, err := ioutil.TempDir("", "content-suite-"+name+"-")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
cs, cleanup, err := storeFn(ctx, tmpDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
defer testutil.DumpDir(t, tmpDir)
|
||||
fn(ctx, t, cs)
|
||||
}
|
||||
}
|
||||
|
||||
func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
c1, d1 := createContent(256, 1)
|
||||
w1, err := cs.Writer(ctx, "c1", 0, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
c2, d2 := createContent(256, 2)
|
||||
w2, err := cs.Writer(ctx, "c2", int64(len(c2)), "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
c3, d3 := createContent(256, 3)
|
||||
w3, err := cs.Writer(ctx, "c3", 0, d3)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
c4, d4 := createContent(256, 4)
|
||||
w4, err := cs.Writer(ctx, "c4", int64(len(c4)), d4)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
smallbuf := make([]byte, 32)
|
||||
for _, s := range []struct {
|
||||
content []byte
|
||||
digest digest.Digest
|
||||
writer content.Writer
|
||||
}{
|
||||
{
|
||||
content: c1,
|
||||
digest: d1,
|
||||
writer: w1,
|
||||
},
|
||||
{
|
||||
content: c2,
|
||||
digest: d2,
|
||||
writer: w2,
|
||||
},
|
||||
{
|
||||
content: c3,
|
||||
digest: d3,
|
||||
writer: w3,
|
||||
},
|
||||
{
|
||||
content: c4,
|
||||
digest: d4,
|
||||
writer: w4,
|
||||
},
|
||||
} {
|
||||
n, err := io.CopyBuffer(s.writer, bytes.NewReader(s.content), smallbuf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if n != int64(len(s.content)) {
|
||||
t.Fatalf("Unexpected copy length %d, expected %d", n, len(s.content))
|
||||
}
|
||||
|
||||
preCommit := time.Now()
|
||||
if err := s.writer.Commit(0, ""); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
postCommit := time.Now()
|
||||
|
||||
if s.writer.Digest() != s.digest {
|
||||
t.Fatalf("Unexpected commit digest %s, expected %s", s.writer.Digest(), s.digest)
|
||||
}
|
||||
|
||||
info := content.Info{
|
||||
Digest: s.digest,
|
||||
Size: int64(len(s.content)),
|
||||
}
|
||||
if err := checkInfo(ctx, cs, s.digest, info, preCommit, postCommit, preCommit, postCommit); err != nil {
|
||||
t.Fatalf("Check info failed: %+v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
c1, d1 := createContent(256, 1)
|
||||
|
||||
preStart := time.Now()
|
||||
w1, err := cs.Writer(ctx, "c1", 256, d1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
postStart := time.Now()
|
||||
|
||||
d := digest.FromBytes([]byte{})
|
||||
|
||||
expected := content.Status{
|
||||
Ref: "c1",
|
||||
Total: 256,
|
||||
Expected: d1,
|
||||
}
|
||||
preUpdate := preStart
|
||||
postUpdate := postStart
|
||||
|
||||
if err := checkStatus(w1, expected, d, preStart, postStart, preUpdate, postUpdate); err != nil {
|
||||
t.Fatalf("Status check failed: %+v", err)
|
||||
}
|
||||
|
||||
// Write first 64 bytes
|
||||
preUpdate = time.Now()
|
||||
if _, err := w1.Write(c1[:64]); err != nil {
|
||||
t.Fatalf("Failed to write: %+v", err)
|
||||
}
|
||||
postUpdate = time.Now()
|
||||
expected.Offset = 64
|
||||
d = digest.FromBytes(c1[:64])
|
||||
if err := checkStatus(w1, expected, d, preStart, postStart, preUpdate, postUpdate); err != nil {
|
||||
t.Fatalf("Status check failed: %+v", err)
|
||||
}
|
||||
|
||||
// Write next 128 bytes
|
||||
preUpdate = time.Now()
|
||||
if _, err := w1.Write(c1[64:192]); err != nil {
|
||||
t.Fatalf("Failed to write: %+v", err)
|
||||
}
|
||||
postUpdate = time.Now()
|
||||
expected.Offset = 192
|
||||
d = digest.FromBytes(c1[:192])
|
||||
if err := checkStatus(w1, expected, d, preStart, postStart, preUpdate, postUpdate); err != nil {
|
||||
t.Fatalf("Status check failed: %+v", err)
|
||||
}
|
||||
|
||||
// Write last 64 bytes
|
||||
preUpdate = time.Now()
|
||||
if _, err := w1.Write(c1[192:]); err != nil {
|
||||
t.Fatalf("Failed to write: %+v", err)
|
||||
}
|
||||
postUpdate = time.Now()
|
||||
expected.Offset = 256
|
||||
if err := checkStatus(w1, expected, d1, preStart, postStart, preUpdate, postUpdate); err != nil {
|
||||
t.Fatalf("Status check failed: %+v", err)
|
||||
}
|
||||
|
||||
preCommit := time.Now()
|
||||
if err := w1.Commit(0, ""); err != nil {
|
||||
t.Fatalf("Commit failed: %+v", err)
|
||||
}
|
||||
postCommit := time.Now()
|
||||
|
||||
info := content.Info{
|
||||
Digest: d1,
|
||||
Size: 256,
|
||||
}
|
||||
|
||||
if err := checkInfo(ctx, cs, d1, info, preCommit, postCommit, preCommit, postCommit); err != nil {
|
||||
t.Fatalf("Check info failed: %+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func checkStatus(w content.Writer, expected content.Status, d digest.Digest, preStart, postStart, preUpdate, postUpdate time.Time) error {
|
||||
st, err := w.Status()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to get status")
|
||||
}
|
||||
|
||||
wd := w.Digest()
|
||||
if wd != d {
|
||||
return errors.Errorf("unexpected digest %v, expected %v", wd, d)
|
||||
}
|
||||
|
||||
if st.Ref != expected.Ref {
|
||||
return errors.Errorf("unexpected ref %q, expected %q", st.Ref, expected.Ref)
|
||||
}
|
||||
|
||||
if st.Offset != expected.Offset {
|
||||
return errors.Errorf("unexpected offset %d, expected %d", st.Offset, expected.Offset)
|
||||
}
|
||||
|
||||
if st.Total != expected.Total {
|
||||
return errors.Errorf("unexpected total %d, expected %d", st.Total, expected.Total)
|
||||
}
|
||||
|
||||
// TODO: Add this test once all implementations guarantee this value is held
|
||||
//if st.Expected != expected.Expected {
|
||||
// return errors.Errorf("unexpected \"expected digest\" %q, expected %q", st.Expected, expected.Expected)
|
||||
//}
|
||||
|
||||
if st.StartedAt.After(postStart) || st.StartedAt.Before(preStart) {
|
||||
return errors.Errorf("unexpected started at time %s, expected between %s and %s", st.StartedAt, preStart, postStart)
|
||||
}
|
||||
if st.UpdatedAt.After(postUpdate) || st.UpdatedAt.Before(preUpdate) {
|
||||
return errors.Errorf("unexpected updated at time %s, expected between %s and %s", st.UpdatedAt, preUpdate, postUpdate)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkInfo(ctx context.Context, cs content.Store, d digest.Digest, expected content.Info, c1, c2, u1, u2 time.Time) error {
|
||||
info, err := cs.Info(ctx, d)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to get info")
|
||||
}
|
||||
|
||||
if info.Digest != d {
|
||||
return errors.Errorf("unexpected info digest %s, expected %s", info.Digest, d)
|
||||
}
|
||||
|
||||
if info.Size != expected.Size {
|
||||
return errors.Errorf("unexpected info size %d, expected %d", info.Size, expected.Size)
|
||||
}
|
||||
|
||||
if info.CreatedAt.After(c2) || info.CreatedAt.Before(c1) {
|
||||
return errors.Errorf("unexpected created at time %s, expected between %s and %s", info.CreatedAt, c1, c2)
|
||||
}
|
||||
if info.UpdatedAt.After(u2) || info.UpdatedAt.Before(u1) {
|
||||
return errors.Errorf("unexpected updated at time %s, expected between %s and %s", info.UpdatedAt, u1, u2)
|
||||
}
|
||||
|
||||
if len(info.Labels) != len(expected.Labels) {
|
||||
return errors.Errorf("mismatched number of labels\ngot:\n%#v\nexpected:\n%#v", info.Labels, expected.Labels)
|
||||
}
|
||||
|
||||
for k, v := range expected.Labels {
|
||||
actual := info.Labels[k]
|
||||
if v != actual {
|
||||
return errors.Errorf("unexpected value for label %q: %q, expected %q", k, actual, v)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func createContent(size, seed int64) ([]byte, digest.Digest) {
|
||||
b, err := ioutil.ReadAll(io.LimitReader(rand.New(rand.NewSource(seed)), size))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return b, digest.FromBytes(b)
|
||||
}
|
@ -251,7 +251,12 @@ func (cs *contentStore) Status(ctx context.Context, ref string) (content.Status,
|
||||
return content.Status{}, err
|
||||
}
|
||||
|
||||
return cs.Store.Status(ctx, bref)
|
||||
st, err := cs.Store.Status(ctx, bref)
|
||||
if err != nil {
|
||||
return content.Status{}, err
|
||||
}
|
||||
st.Ref = ref
|
||||
return st, nil
|
||||
}
|
||||
|
||||
func (cs *contentStore) Abort(ctx context.Context, ref string) error {
|
||||
@ -385,13 +390,14 @@ func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Dige
|
||||
return err
|
||||
}
|
||||
|
||||
timeEncoded, err := status.UpdatedAt.MarshalBinary()
|
||||
timeEncoded, err := time.Now().UTC().MarshalBinary()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, v := range [][2][]byte{
|
||||
{bucketKeyCreatedAt, timeEncoded},
|
||||
{bucketKeyUpdatedAt, timeEncoded},
|
||||
{bucketKeySize, sizeEncoded},
|
||||
} {
|
||||
if err := bkt.Put(v[0], v[1]); err != nil {
|
||||
@ -402,6 +408,14 @@ func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Dige
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nw *namespacedWriter) Status() (content.Status, error) {
|
||||
st, err := nw.Writer.Status()
|
||||
if err == nil {
|
||||
st.Ref = nw.ref
|
||||
}
|
||||
return st, err
|
||||
}
|
||||
|
||||
func (cs *contentStore) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) {
|
||||
if err := cs.checkAccess(ctx, dgst); err != nil {
|
||||
return nil, err
|
||||
|
31
metadata/content_test.go
Normal file
31
metadata/content_test.go
Normal file
@ -0,0 +1,31 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/content/local"
|
||||
"github.com/containerd/containerd/content/testsuite"
|
||||
)
|
||||
|
||||
func TestContent(t *testing.T) {
|
||||
testsuite.ContentSuite(t, "metadata", func(ctx context.Context, root string) (content.Store, func(), error) {
|
||||
// TODO: Use mocked or in-memory store
|
||||
cs, err := local.NewStore(root)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
db, err := bolt.Open(filepath.Join(root, "metadata.db"), 0660, nil)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return NewContentStore(db, cs), func() {
|
||||
db.Close()
|
||||
}, nil
|
||||
})
|
||||
}
|
@ -19,7 +19,7 @@ import (
|
||||
snapshot "github.com/containerd/containerd/api/services/snapshot/v1"
|
||||
tasks "github.com/containerd/containerd/api/services/tasks/v1"
|
||||
version "github.com/containerd/containerd/api/services/version/v1"
|
||||
store "github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/content/local"
|
||||
"github.com/containerd/containerd/events"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
@ -161,7 +161,7 @@ func loadPlugins(config *Config) ([]*plugin.Registration, error) {
|
||||
Type: plugin.ContentPlugin,
|
||||
ID: "content",
|
||||
Init: func(ic *plugin.InitContext) (interface{}, error) {
|
||||
return store.NewStore(ic.Root)
|
||||
return local.NewStore(ic.Root)
|
||||
},
|
||||
})
|
||||
plugin.Register(&plugin.Registration{
|
||||
|
Loading…
Reference in New Issue
Block a user