Move content/local to plugins/content/local

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan
2024-01-17 09:50:23 -08:00
parent aad37698ed
commit 846f7d4e65
23 changed files with 11 additions and 11 deletions

View File

@@ -0,0 +1,76 @@
//go:build gofuzz
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"bufio"
"bytes"
"context"
_ "crypto/sha256"
"io"
"testing"
"github.com/opencontainers/go-digest"
"github.com/containerd/containerd/v2/content"
)
func FuzzContentStoreWriter(data []byte) int {
t := &testing.T{}
ctx := context.Background()
ctx, _, cs, cleanup := contentStoreEnv(t)
defer cleanup()
cw, err := cs.Writer(ctx, content.WithRef("myref"))
if err != nil {
return 0
}
if err := cw.Close(); err != nil {
return 0
}
// reopen, so we can test things
cw, err = cs.Writer(ctx, content.WithRef("myref"))
if err != nil {
return 0
}
err = checkCopyFuzz(int64(len(data)), cw, bufio.NewReader(io.NopCloser(bytes.NewReader(data))))
if err != nil {
return 0
}
expected := digest.FromBytes(data)
if err = cw.Commit(ctx, int64(len(data)), expected); err != nil {
return 0
}
return 1
}
func checkCopyFuzz(size int64, dst io.Writer, src io.Reader) error {
nn, err := io.Copy(dst, src)
if err != nil {
return err
}
if nn != size {
return err
}
return nil
}

View File

@@ -0,0 +1,62 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"fmt"
"sync"
"time"
"github.com/containerd/containerd/v2/errdefs"
)
// Handles locking references
type lock struct {
since time.Time
}
var (
// locks lets us lock in process
locks = make(map[string]*lock)
locksMu sync.Mutex
)
func tryLock(ref string) error {
locksMu.Lock()
defer locksMu.Unlock()
if v, ok := locks[ref]; ok {
// Returning the duration may help developers distinguish dead locks (long duration) from
// lock contentions (short duration).
now := time.Now()
return fmt.Errorf(
"ref %s locked for %s (since %s): %w", ref, now.Sub(v.since), v.since,
errdefs.ErrUnavailable,
)
}
locks[ref] = &lock{time.Now()}
return nil
}
func unlock(ref string) {
locksMu.Lock()
defer locksMu.Unlock()
delete(locks, ref)
}

View File

@@ -0,0 +1,34 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestTryLock(t *testing.T) {
err := tryLock("testref")
assert.NoError(t, err)
defer unlock("testref")
err = tryLock("testref")
require.NotNil(t, err)
assert.Contains(t, err.Error(), "ref testref locked for ")
}

View File

@@ -0,0 +1,72 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"fmt"
"io"
"os"
"github.com/containerd/containerd/v2/content"
"github.com/containerd/containerd/v2/errdefs"
)
// readerat implements io.ReaderAt in a completely stateless manner by opening
// the referenced file for each call to ReadAt.
type sizeReaderAt struct {
size int64
fp *os.File
}
// OpenReader creates ReaderAt from a file
func OpenReader(p string) (content.ReaderAt, error) {
fi, err := os.Stat(p)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
return nil, fmt.Errorf("blob not found: %w", errdefs.ErrNotFound)
}
fp, err := os.Open(p)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
return nil, fmt.Errorf("blob not found: %w", errdefs.ErrNotFound)
}
return sizeReaderAt{size: fi.Size(), fp: fp}, nil
}
func (ra sizeReaderAt) ReadAt(p []byte, offset int64) (int, error) {
return ra.fp.ReadAt(p, offset)
}
func (ra sizeReaderAt) Size() int64 {
return ra.size
}
func (ra sizeReaderAt) Close() error {
return ra.fp.Close()
}
func (ra sizeReaderAt) Reader() io.Reader {
return io.LimitReader(ra.fp, ra.size)
}

View File

@@ -0,0 +1,686 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/containerd/containerd/v2/content"
"github.com/containerd/containerd/v2/errdefs"
"github.com/containerd/containerd/v2/filters"
"github.com/containerd/log"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
var bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 1<<20)
return &buffer
},
}
// LabelStore is used to store mutable labels for digests
type LabelStore interface {
// Get returns all the labels for the given digest
Get(digest.Digest) (map[string]string, error)
// Set sets all the labels for a given digest
Set(digest.Digest, map[string]string) error
// Update replaces the given labels for a digest,
// a key with an empty value removes a label.
Update(digest.Digest, map[string]string) (map[string]string, error)
}
// Store is digest-keyed store for content. All data written into the store is
// stored under a verifiable digest.
//
// Store can generally support multi-reader, single-writer ingest of data,
// including resumable ingest.
type store struct {
root string
ls LabelStore
}
// NewStore returns a local content store
func NewStore(root string) (content.Store, error) {
return NewLabeledStore(root, nil)
}
// NewLabeledStore returns a new content store using the provided label store
//
// Note: content stores which are used underneath a metadata store may not
// require labels and should use `NewStore`. `NewLabeledStore` is primarily
// useful for tests or standalone implementations.
func NewLabeledStore(root string, ls LabelStore) (content.Store, error) {
if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil {
return nil, err
}
return &store{
root: root,
ls: ls,
}, nil
}
func (s *store) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
p, err := s.blobPath(dgst)
if err != nil {
return content.Info{}, fmt.Errorf("calculating blob info path: %w", err)
}
fi, err := os.Stat(p)
if err != nil {
if os.IsNotExist(err) {
err = fmt.Errorf("content %v: %w", dgst, errdefs.ErrNotFound)
}
return content.Info{}, err
}
var labels map[string]string
if s.ls != nil {
labels, err = s.ls.Get(dgst)
if err != nil {
return content.Info{}, err
}
}
return s.info(dgst, fi, labels), nil
}
func (s *store) info(dgst digest.Digest, fi os.FileInfo, labels map[string]string) content.Info {
return content.Info{
Digest: dgst,
Size: fi.Size(),
CreatedAt: fi.ModTime(),
UpdatedAt: getATime(fi),
Labels: labels,
}
}
// ReaderAt returns an io.ReaderAt for the blob.
func (s *store) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
p, err := s.blobPath(desc.Digest)
if err != nil {
return nil, fmt.Errorf("calculating blob path for ReaderAt: %w", err)
}
reader, err := OpenReader(p)
if err != nil {
return nil, fmt.Errorf("blob %s expected at %s: %w", desc.Digest, p, err)
}
return reader, nil
}
// Delete removes a blob by its digest.
//
// While this is safe to do concurrently, safe exist-removal logic must hold
// some global lock on the store.
func (s *store) Delete(ctx context.Context, dgst digest.Digest) error {
bp, err := s.blobPath(dgst)
if err != nil {
return fmt.Errorf("calculating blob path for delete: %w", err)
}
if err := os.RemoveAll(bp); err != nil {
if !os.IsNotExist(err) {
return err
}
return fmt.Errorf("content %v: %w", dgst, errdefs.ErrNotFound)
}
return nil
}
func (s *store) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
if s.ls == nil {
return content.Info{}, fmt.Errorf("update not supported on immutable content store: %w", errdefs.ErrFailedPrecondition)
}
p, err := s.blobPath(info.Digest)
if err != nil {
return content.Info{}, fmt.Errorf("calculating blob path for update: %w", err)
}
fi, err := os.Stat(p)
if err != nil {
if os.IsNotExist(err) {
err = fmt.Errorf("content %v: %w", info.Digest, errdefs.ErrNotFound)
}
return content.Info{}, err
}
var (
all bool
labels map[string]string
)
if len(fieldpaths) > 0 {
for _, path := range fieldpaths {
if strings.HasPrefix(path, "labels.") {
if labels == nil {
labels = map[string]string{}
}
key := strings.TrimPrefix(path, "labels.")
labels[key] = info.Labels[key]
continue
}
switch path {
case "labels":
all = true
labels = info.Labels
default:
return content.Info{}, fmt.Errorf("cannot update %q field on content info %q: %w", path, info.Digest, errdefs.ErrInvalidArgument)
}
}
} else {
all = true
labels = info.Labels
}
if all {
err = s.ls.Set(info.Digest, labels)
} else {
labels, err = s.ls.Update(info.Digest, labels)
}
if err != nil {
return content.Info{}, err
}
info = s.info(info.Digest, fi, labels)
info.UpdatedAt = time.Now()
if err := os.Chtimes(p, info.UpdatedAt, info.CreatedAt); err != nil {
log.G(ctx).WithError(err).Warnf("could not change access time for %s", info.Digest)
}
return info, nil
}
func (s *store) Walk(ctx context.Context, fn content.WalkFunc, fs ...string) error {
root := filepath.Join(s.root, "blobs")
filter, err := filters.ParseAll(fs...)
if err != nil {
return err
}
var alg digest.Algorithm
return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if !fi.IsDir() && !alg.Available() {
return nil
}
// TODO(stevvooe): There are few more cases with subdirs that should be
// handled in case the layout gets corrupted. This isn't strict enough
// and may spew bad data.
if path == root {
return nil
}
if filepath.Dir(path) == root {
alg = digest.Algorithm(filepath.Base(path))
if !alg.Available() {
alg = ""
return filepath.SkipDir
}
// descending into a hash directory
return nil
}
dgst := digest.NewDigestFromEncoded(alg, filepath.Base(path))
if err := dgst.Validate(); err != nil {
// log error but don't report
log.L.WithError(err).WithField("path", path).Error("invalid digest for blob path")
// if we see this, it could mean some sort of corruption of the
// store or extra paths not expected previously.
}
var labels map[string]string
if s.ls != nil {
labels, err = s.ls.Get(dgst)
if err != nil {
return err
}
}
info := s.info(dgst, fi, labels)
if !filter.Match(content.AdaptInfo(info)) {
return nil
}
return fn(info)
})
}
func (s *store) Status(ctx context.Context, ref string) (content.Status, error) {
return s.status(s.ingestRoot(ref))
}
func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) {
fp, err := os.Open(filepath.Join(s.root, "ingest"))
if err != nil {
return nil, err
}
defer fp.Close()
fis, err := fp.Readdirnames(-1)
if err != nil {
return nil, err
}
filter, err := filters.ParseAll(fs...)
if err != nil {
return nil, err
}
var active []content.Status
for _, fi := range fis {
p := filepath.Join(s.root, "ingest", fi)
stat, err := s.status(p)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
// TODO(stevvooe): This is a common error if uploads are being
// completed while making this listing. Need to consider taking a
// lock on the whole store to coordinate this aspect.
//
// Another option is to cleanup downloads asynchronously and
// coordinate this method with the cleanup process.
//
// For now, we just skip them, as they really don't exist.
continue
}
if filter.Match(adaptStatus(stat)) {
active = append(active, stat)
}
}
return active, nil
}
// WalkStatusRefs is used to walk all status references
// Failed status reads will be logged and ignored, if
// this function is called while references are being altered,
// these error messages may be produced.
func (s *store) WalkStatusRefs(ctx context.Context, fn func(string) error) error {
fp, err := os.Open(filepath.Join(s.root, "ingest"))
if err != nil {
return err
}
defer fp.Close()
fis, err := fp.Readdirnames(-1)
if err != nil {
return err
}
for _, fi := range fis {
rf := filepath.Join(s.root, "ingest", fi, "ref")
ref, err := readFileString(rf)
if err != nil {
log.G(ctx).WithError(err).WithField("path", rf).Error("failed to read ingest ref")
continue
}
if err := fn(ref); err != nil {
return err
}
}
return nil
}
// status works like stat above except uses the path to the ingest.
func (s *store) status(ingestPath string) (content.Status, error) {
dp := filepath.Join(ingestPath, "data")
fi, err := os.Stat(dp)
if err != nil {
if os.IsNotExist(err) {
err = fmt.Errorf("%s: %w", err.Error(), errdefs.ErrNotFound)
}
return content.Status{}, err
}
ref, err := readFileString(filepath.Join(ingestPath, "ref"))
if err != nil {
if os.IsNotExist(err) {
err = fmt.Errorf("%s: %w", err.Error(), errdefs.ErrNotFound)
}
return content.Status{}, err
}
startedAt, err := readFileTimestamp(filepath.Join(ingestPath, "startedat"))
if err != nil {
return content.Status{}, fmt.Errorf("could not read startedat: %w", err)
}
updatedAt, err := readFileTimestamp(filepath.Join(ingestPath, "updatedat"))
if err != nil {
return content.Status{}, fmt.Errorf("could not read updatedat: %w", err)
}
// because we don't write updatedat on every write, the mod time may
// actually be more up to date.
if fi.ModTime().After(updatedAt) {
updatedAt = fi.ModTime()
}
return content.Status{
Ref: ref,
Offset: fi.Size(),
Total: s.total(ingestPath),
UpdatedAt: updatedAt,
StartedAt: startedAt,
}, nil
}
func adaptStatus(status content.Status) filters.Adaptor {
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
case "ref":
return status.Ref, true
}
return "", false
})
}
// total attempts to resolve the total expected size for the write.
func (s *store) total(ingestPath string) int64 {
totalS, err := readFileString(filepath.Join(ingestPath, "total"))
if err != nil {
return 0
}
total, err := strconv.ParseInt(totalS, 10, 64)
if err != nil {
// represents a corrupted file, should probably remove.
return 0
}
return total
}
// Writer begins or resumes the active writer identified by ref. If the writer
// is already in use, an error is returned. Only one writer may be in use per
// ref at a time.
//
// The argument `ref` is used to uniquely identify a long-lived writer transaction.
func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
var wOpts content.WriterOpts
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil, err
}
}
// TODO(AkihiroSuda): we could create a random string or one calculated based on the context
// https://github.com/containerd/containerd/issues/2129#issuecomment-380255019
if wOpts.Ref == "" {
return nil, fmt.Errorf("ref must not be empty: %w", errdefs.ErrInvalidArgument)
}
if err := tryLock(wOpts.Ref); err != nil {
return nil, err
}
w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
if err != nil {
unlock(wOpts.Ref)
return nil, err
}
return w, nil // lock is now held by w.
}
func (s *store) resumeStatus(ref string, total int64, digester digest.Digester) (content.Status, error) {
path, _, data := s.ingestPaths(ref)
status, err := s.status(path)
if err != nil {
return status, fmt.Errorf("failed reading status of resume write: %w", err)
}
if ref != status.Ref {
// NOTE(stevvooe): This is fairly catastrophic. Either we have some
// layout corruption or a hash collision for the ref key.
return status, fmt.Errorf("ref key does not match: %v != %v", ref, status.Ref)
}
if total > 0 && status.Total > 0 && total != status.Total {
return status, fmt.Errorf("provided total differs from status: %v != %v", total, status.Total)
}
//nolint:dupword
// TODO(stevvooe): slow slow slow!!, send to goroutine or use resumable hashes
fp, err := os.Open(data)
if err != nil {
return status, err
}
p := bufPool.Get().(*[]byte)
status.Offset, err = io.CopyBuffer(digester.Hash(), fp, *p)
bufPool.Put(p)
fp.Close()
return status, err
}
// writer provides the main implementation of the Writer method. The caller
// must hold the lock correctly and release on error if there is a problem.
func (s *store) writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {
// TODO(stevvooe): Need to actually store expected here. We have
// code in the service that shouldn't be dealing with this.
if expected != "" {
p, err := s.blobPath(expected)
if err != nil {
return nil, fmt.Errorf("calculating expected blob path for writer: %w", err)
}
if _, err := os.Stat(p); err == nil {
return nil, fmt.Errorf("content %v: %w", expected, errdefs.ErrAlreadyExists)
}
}
path, refp, data := s.ingestPaths(ref)
var (
digester = digest.Canonical.Digester()
offset int64
startedAt time.Time
updatedAt time.Time
)
foundValidIngest := false
// ensure that the ingest path has been created.
if err := os.Mkdir(path, 0755); err != nil {
if !os.IsExist(err) {
return nil, err
}
status, err := s.resumeStatus(ref, total, digester)
if err == nil {
foundValidIngest = true
updatedAt = status.UpdatedAt
startedAt = status.StartedAt
total = status.Total
offset = status.Offset
} else {
log.G(ctx).Infof("failed to resume the status from path %s: %s. will recreate them", path, err.Error())
}
}
if !foundValidIngest {
startedAt = time.Now()
updatedAt = startedAt
// the ingest is new, we need to setup the target location.
// write the ref to a file for later use
if err := os.WriteFile(refp, []byte(ref), 0666); err != nil {
return nil, err
}
if err := writeTimestampFile(filepath.Join(path, "startedat"), startedAt); err != nil {
return nil, err
}
if err := writeTimestampFile(filepath.Join(path, "updatedat"), startedAt); err != nil {
return nil, err
}
if total > 0 {
if err := os.WriteFile(filepath.Join(path, "total"), []byte(fmt.Sprint(total)), 0666); err != nil {
return nil, err
}
}
}
fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return nil, fmt.Errorf("failed to open data file: %w", err)
}
if _, err := fp.Seek(offset, io.SeekStart); err != nil {
fp.Close()
return nil, fmt.Errorf("could not seek to current write offset: %w", err)
}
return &writer{
s: s,
fp: fp,
ref: ref,
path: path,
offset: offset,
total: total,
digester: digester,
startedAt: startedAt,
updatedAt: updatedAt,
}, nil
}
// Abort an active transaction keyed by ref. If the ingest is active, it will
// be cancelled. Any resources associated with the ingest will be cleaned.
func (s *store) Abort(ctx context.Context, ref string) error {
root := s.ingestRoot(ref)
if err := os.RemoveAll(root); err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("ingest ref %q: %w", ref, errdefs.ErrNotFound)
}
return err
}
return nil
}
func (s *store) blobPath(dgst digest.Digest) (string, error) {
if err := dgst.Validate(); err != nil {
return "", fmt.Errorf("cannot calculate blob path from invalid digest: %v: %w", err, errdefs.ErrInvalidArgument)
}
return filepath.Join(s.root, "blobs", dgst.Algorithm().String(), dgst.Encoded()), nil
}
func (s *store) ingestRoot(ref string) string {
// we take a digest of the ref to keep the ingest paths constant length.
// Note that this is not the current or potential digest of incoming content.
dgst := digest.FromString(ref)
return filepath.Join(s.root, "ingest", dgst.Encoded())
}
// ingestPaths are returned. The paths are the following:
//
// - root: entire ingest directory
// - ref: name of the starting ref, must be unique
// - data: file where data is written
func (s *store) ingestPaths(ref string) (string, string, string) {
var (
fp = s.ingestRoot(ref)
rp = filepath.Join(fp, "ref")
dp = filepath.Join(fp, "data")
)
return fp, rp, dp
}
func readFileString(path string) (string, error) {
p, err := os.ReadFile(path)
return string(p), err
}
// readFileTimestamp reads a file with just a timestamp present.
func readFileTimestamp(p string) (time.Time, error) {
b, err := os.ReadFile(p)
if err != nil {
if os.IsNotExist(err) {
err = fmt.Errorf("%s: %w", err.Error(), errdefs.ErrNotFound)
}
return time.Time{}, err
}
var t time.Time
if err := t.UnmarshalText(b); err != nil {
return time.Time{}, fmt.Errorf("could not parse timestamp file %v: %w", p, err)
}
return t, nil
}
func writeTimestampFile(p string, t time.Time) error {
b, err := t.MarshalText()
if err != nil {
return err
}
return writeToCompletion(p, b, 0666)
}
func writeToCompletion(path string, data []byte, mode os.FileMode) error {
tmp := fmt.Sprintf("%s.tmp", path)
f, err := os.OpenFile(tmp, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, mode)
if err != nil {
return fmt.Errorf("create tmp file: %w", err)
}
_, err = f.Write(data)
f.Close()
if err != nil {
return fmt.Errorf("write tmp file: %w", err)
}
err = os.Rename(tmp, path)
if err != nil {
return fmt.Errorf("rename tmp file: %w", err)
}
return nil
}

View File

@@ -0,0 +1,33 @@
//go:build darwin || freebsd || netbsd
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"os"
"syscall"
"time"
)
func getATime(fi os.FileInfo) time.Time {
if st, ok := fi.Sys().(*syscall.Stat_t); ok {
return time.Unix(st.Atimespec.Unix())
}
return fi.ModTime()
}

View File

@@ -0,0 +1,33 @@
//go:build openbsd
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"os"
"syscall"
"time"
)
func getATime(fi os.FileInfo) time.Time {
if st, ok := fi.Sys().(*syscall.Stat_t); ok {
return time.Unix(st.Atim.Unix())
}
return fi.ModTime()
}

View File

@@ -0,0 +1,389 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"bufio"
"bytes"
"context"
"crypto/rand"
_ "crypto/sha256" // required for digest package
"fmt"
"io"
"os"
"path/filepath"
"reflect"
"runtime"
"sync"
"testing"
"time"
"github.com/containerd/containerd/v2/content"
"github.com/containerd/containerd/v2/content/testsuite"
"github.com/containerd/containerd/v2/errdefs"
"github.com/containerd/containerd/v2/pkg/randutil"
"github.com/containerd/containerd/v2/pkg/testutil"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/stretchr/testify/assert"
)
type memoryLabelStore struct {
l sync.Mutex
labels map[digest.Digest]map[string]string
}
func newMemoryLabelStore() LabelStore {
return &memoryLabelStore{
labels: map[digest.Digest]map[string]string{},
}
}
func (mls *memoryLabelStore) Get(d digest.Digest) (map[string]string, error) {
mls.l.Lock()
labels := mls.labels[d]
mls.l.Unlock()
return labels, nil
}
func (mls *memoryLabelStore) Set(d digest.Digest, labels map[string]string) error {
mls.l.Lock()
mls.labels[d] = labels
mls.l.Unlock()
return nil
}
func (mls *memoryLabelStore) Update(d digest.Digest, update map[string]string) (map[string]string, error) {
mls.l.Lock()
labels, ok := mls.labels[d]
if !ok {
labels = map[string]string{}
}
for k, v := range update {
if v == "" {
delete(labels, k)
} else {
labels[k] = v
}
}
mls.labels[d] = labels
mls.l.Unlock()
return labels, nil
}
func TestContent(t *testing.T) {
testsuite.ContentSuite(t, "fs", func(ctx context.Context, root string) (context.Context, content.Store, func() error, error) {
cs, err := NewLabeledStore(root, newMemoryLabelStore())
if err != nil {
return nil, nil, nil, err
}
return ctx, cs, func() error {
return nil
}, nil
})
}
func TestContentWriter(t *testing.T) {
ctx, tmpdir, cs, cleanup := contentStoreEnv(t)
defer cleanup()
defer testutil.DumpDirOnFailure(t, tmpdir)
if _, err := os.Stat(filepath.Join(tmpdir, "ingest")); os.IsNotExist(err) {
t.Fatal("ingest dir should be created", err)
}
cw, err := cs.Writer(ctx, content.WithRef("myref"))
if err != nil {
t.Fatal(err)
}
if err := cw.Close(); err != nil {
t.Fatal(err)
}
// reopen, so we can test things
cw, err = cs.Writer(ctx, content.WithRef("myref"))
if err != nil {
t.Fatal(err)
}
// make sure that second resume also fails
if _, err = cs.Writer(ctx, content.WithRef("myref")); err == nil {
// TODO(stevvooe): This also works across processes. Need to find a way
// to test that, as well.
t.Fatal("no error on second resume")
}
// we should also see this as an active ingestion
ingestions, err := cs.ListStatuses(ctx, "")
if err != nil {
t.Fatal(err)
}
// clear out the time and meta cause we don't care for this test
for i := range ingestions {
ingestions[i].UpdatedAt = time.Time{}
ingestions[i].StartedAt = time.Time{}
}
if !reflect.DeepEqual(ingestions, []content.Status{
{
Ref: "myref",
Offset: 0,
},
}) {
t.Fatalf("unexpected ingestion set: %v", ingestions)
}
p := make([]byte, 4<<20)
if _, err := rand.Read(p); err != nil {
t.Fatal(err)
}
expected := digest.FromBytes(p)
checkCopy(t, int64(len(p)), cw, bufio.NewReader(io.NopCloser(bytes.NewReader(p))))
if err := cw.Commit(ctx, int64(len(p)), expected); err != nil {
t.Fatal(err)
}
if err := cw.Close(); err != nil {
t.Fatal(err)
}
cw, err = cs.Writer(ctx, content.WithRef("aref"))
if err != nil {
t.Fatal(err)
}
// now, attempt to write the same data again
checkCopy(t, int64(len(p)), cw, bufio.NewReader(io.NopCloser(bytes.NewReader(p))))
if err := cw.Commit(ctx, int64(len(p)), expected); err == nil {
t.Fatal("expected already exists error")
} else if !errdefs.IsAlreadyExists(err) {
t.Fatal(err)
}
path := checkBlobPath(t, cs, expected)
// read the data back, make sure its the same
pp, err := os.ReadFile(path)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(p, pp) {
t.Fatal("mismatched data written to disk")
}
}
func TestWalkBlobs(t *testing.T) {
ctx, _, cs, cleanup := contentStoreEnv(t)
defer cleanup()
const (
nblobs = 79
maxsize = 4 << 10
)
var (
blobs = populateBlobStore(ctx, t, cs, nblobs, maxsize)
expected = map[digest.Digest]struct{}{}
found = map[digest.Digest]struct{}{}
)
for dgst := range blobs {
expected[dgst] = struct{}{}
}
if err := cs.Walk(ctx, func(bi content.Info) error {
found[bi.Digest] = struct{}{}
checkBlobPath(t, cs, bi.Digest)
return nil
}); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(expected, found) {
t.Fatalf("expected did not match found: %v != %v", found, expected)
}
}
// BenchmarkIngests checks the insertion time over varying blob sizes.
//
// Note that at the time of writing there is roughly a 4ms insertion overhead
// for blobs. This seems to be due to the number of syscalls and file io we do
// coordinating the ingestion.
func BenchmarkIngests(b *testing.B) {
ctx, _, cs, cleanup := contentStoreEnv(b)
defer cleanup()
for _, size := range []int64{
1 << 10,
4 << 10,
512 << 10,
1 << 20,
} {
size := size
b.Run(fmt.Sprint(size), func(b *testing.B) {
b.StopTimer()
blobs := generateBlobs(b, int64(b.N), size)
var bytes int64
for _, blob := range blobs {
bytes += int64(len(blob))
}
b.SetBytes(bytes)
b.StartTimer()
for dgst, p := range blobs {
checkWrite(ctx, b, cs, dgst, p)
}
})
}
}
type checker interface {
Fatal(args ...interface{})
}
func generateBlobs(t checker, nblobs, maxsize int64) map[digest.Digest][]byte {
blobs := map[digest.Digest][]byte{}
for i := int64(0); i < nblobs; i++ {
p := make([]byte, randutil.Int63n(maxsize))
if _, err := rand.Read(p); err != nil {
t.Fatal(err)
}
dgst := digest.FromBytes(p)
blobs[dgst] = p
}
return blobs
}
func populateBlobStore(ctx context.Context, t checker, cs content.Store, nblobs, maxsize int64) map[digest.Digest][]byte {
blobs := generateBlobs(t, nblobs, maxsize)
for dgst, p := range blobs {
checkWrite(ctx, t, cs, dgst, p)
}
return blobs
}
func checkCopy(t checker, size int64, dst io.Writer, src io.Reader) {
nn, err := io.Copy(dst, src)
if err != nil {
t.Fatal(err)
}
if nn != size {
t.Fatal("incorrect number of bytes copied")
}
}
func checkBlobPath(t *testing.T, cs content.Store, dgst digest.Digest) string {
path, err := cs.(*store).blobPath(dgst)
if err != nil {
t.Fatalf("failed to calculate blob path: %v", err)
}
if path != filepath.Join(cs.(*store).root, "blobs", dgst.Algorithm().String(), dgst.Encoded()) {
t.Fatalf("unexpected path: %q", path)
}
fi, err := os.Stat(path)
if err != nil {
t.Fatalf("error stating blob path: %v", err)
}
if runtime.GOOS != "windows" {
// ensure that only read bits are set.
if ((fi.Mode() & os.ModePerm) & 0333) != 0 {
t.Fatalf("incorrect permissions: %v", fi.Mode())
}
}
return path
}
func checkWrite(ctx context.Context, t checker, cs content.Store, dgst digest.Digest, p []byte) digest.Digest {
if err := content.WriteBlob(ctx, cs, dgst.String(), bytes.NewReader(p),
ocispec.Descriptor{Size: int64(len(p)), Digest: dgst}); err != nil {
t.Fatal(err)
}
return dgst
}
func TestWriterTruncateRecoversFromIncompleteWrite(t *testing.T) {
cs, err := NewStore(t.TempDir())
assert.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ref := "ref"
contentB := []byte("this is the content")
total := int64(len(contentB))
setupIncompleteWrite(ctx, t, cs, ref, total)
writer, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: total}))
assert.NoError(t, err)
assert.Nil(t, writer.Truncate(0))
_, err = writer.Write(contentB)
assert.NoError(t, err)
dgst := digest.FromBytes(contentB)
err = writer.Commit(ctx, total, dgst)
assert.NoError(t, err)
}
func setupIncompleteWrite(ctx context.Context, t *testing.T, cs content.Store, ref string, total int64) {
writer, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: total}))
assert.NoError(t, err)
_, err = writer.Write([]byte("bad data"))
assert.NoError(t, err)
assert.Nil(t, writer.Close())
}
func TestWriteReadEmptyFileTimestamp(t *testing.T) {
root := t.TempDir()
emptyFile := filepath.Join(root, "updatedat")
if err := writeTimestampFile(emptyFile, time.Time{}); err != nil {
t.Errorf("failed to write Zero Time to file: %v", err)
}
timestamp, err := readFileTimestamp(emptyFile)
if err != nil {
t.Errorf("read empty timestamp file should success, but got error: %v", err)
}
if !timestamp.IsZero() {
t.Errorf("read empty timestamp file should return time.Time{}, but got: %v", timestamp)
}
}

View File

@@ -0,0 +1,33 @@
//go:build linux || solaris
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"os"
"syscall"
"time"
)
func getATime(fi os.FileInfo) time.Time {
if st, ok := fi.Sys().(*syscall.Stat_t); ok {
return time.Unix(st.Atim.Unix())
}
return fi.ModTime()
}

View File

@@ -0,0 +1,26 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"os"
"time"
)
func getATime(fi os.FileInfo) time.Time {
return fi.ModTime()
}

View File

@@ -0,0 +1,38 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"testing"
"github.com/containerd/containerd/v2/content"
)
func contentStoreEnv(t testing.TB) (context.Context, string, content.Store, func()) {
tmpdir := t.TempDir()
cs, err := NewStore(tmpdir)
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
return ctx, tmpdir, cs, func() {
cancel()
}
}

View File

@@ -0,0 +1,208 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"time"
"github.com/containerd/containerd/v2/content"
"github.com/containerd/containerd/v2/errdefs"
"github.com/containerd/log"
"github.com/opencontainers/go-digest"
)
// writer represents a write transaction against the blob store.
type writer struct {
s *store
fp *os.File // opened data file
path string // path to writer dir
ref string // ref key
offset int64
total int64
digester digest.Digester
startedAt time.Time
updatedAt time.Time
}
func (w *writer) Status() (content.Status, error) {
return content.Status{
Ref: w.ref,
Offset: w.offset,
Total: w.total,
StartedAt: w.startedAt,
UpdatedAt: w.updatedAt,
}, nil
}
// Digest returns the current digest of the content, up to the current write.
//
// Cannot be called concurrently with `Write`.
func (w *writer) Digest() digest.Digest {
return w.digester.Digest()
}
// Write p to the transaction.
//
// Note that writes are unbuffered to the backing file. When writing, it is
// recommended to wrap in a bufio.Writer or, preferably, use io.CopyBuffer.
func (w *writer) Write(p []byte) (n int, err error) {
n, err = w.fp.Write(p)
w.digester.Hash().Write(p[:n])
w.offset += int64(len(p))
w.updatedAt = time.Now()
return n, err
}
func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
// Ensure even on error the writer is fully closed
defer unlock(w.ref)
var base content.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
return err
}
}
fp := w.fp
w.fp = nil
if fp == nil {
return fmt.Errorf("cannot commit on closed writer: %w", errdefs.ErrFailedPrecondition)
}
if err := fp.Sync(); err != nil {
fp.Close()
return fmt.Errorf("sync failed: %w", err)
}
fi, err := fp.Stat()
closeErr := fp.Close()
if err != nil {
return fmt.Errorf("stat on ingest file failed: %w", err)
}
if closeErr != nil {
return fmt.Errorf("failed to close ingest file: %w", closeErr)
}
if size > 0 && size != fi.Size() {
return fmt.Errorf("unexpected commit size %d, expected %d: %w", fi.Size(), size, errdefs.ErrFailedPrecondition)
}
dgst := w.digester.Digest()
if expected != "" && expected != dgst {
return fmt.Errorf("unexpected commit digest %s, expected %s: %w", dgst, expected, errdefs.ErrFailedPrecondition)
}
var (
ingest = filepath.Join(w.path, "data")
target, _ = w.s.blobPath(dgst) // ignore error because we calculated this dgst
)
// make sure parent directories of blob exist
if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil {
return err
}
if _, err := os.Stat(target); err == nil {
// collision with the target file!
if err := os.RemoveAll(w.path); err != nil {
log.G(ctx).WithField("ref", w.ref).WithField("path", w.path).Error("failed to remove ingest directory")
}
return fmt.Errorf("content %v: %w", dgst, errdefs.ErrAlreadyExists)
}
if err := os.Rename(ingest, target); err != nil {
return err
}
// Ingest has now been made available in the content store, attempt to complete
// setting metadata but errors should only be logged and not returned since
// the content store cannot be cleanly rolled back.
commitTime := time.Now()
if err := os.Chtimes(target, commitTime, commitTime); err != nil {
log.G(ctx).WithField("digest", dgst).Error("failed to change file time to commit time")
}
// clean up!!
if err := os.RemoveAll(w.path); err != nil {
log.G(ctx).WithField("ref", w.ref).WithField("path", w.path).Error("failed to remove ingest directory")
}
if w.s.ls != nil && base.Labels != nil {
if err := w.s.ls.Set(dgst, base.Labels); err != nil {
log.G(ctx).WithField("digest", dgst).Error("failed to set labels")
}
}
// change to readonly, more important for read, but provides _some_
// protection from this point on. We use the existing perms with a mask
// only allowing reads honoring the umask on creation.
//
// This removes write and exec, only allowing read per the creation umask.
//
// NOTE: Windows does not support this operation
if runtime.GOOS != "windows" {
if err := os.Chmod(target, (fi.Mode()&os.ModePerm)&^0333); err != nil {
log.G(ctx).WithField("ref", w.ref).Error("failed to make readonly")
}
}
return nil
}
// Close the writer, flushing any unwritten data and leaving the progress in
// tact.
//
// If one needs to resume the transaction, a new writer can be obtained from
// `Ingester.Writer` using the same key. The write can then be continued
// from it was left off.
//
// To abandon a transaction completely, first call close then `IngestManager.Abort` to
// clean up the associated resources.
func (w *writer) Close() (err error) {
if w.fp != nil {
w.fp.Sync()
err = w.fp.Close()
writeTimestampFile(filepath.Join(w.path, "updatedat"), w.updatedAt)
w.fp = nil
unlock(w.ref)
return
}
return nil
}
func (w *writer) Truncate(size int64) error {
if size != 0 {
return errors.New("Truncate: unsupported size")
}
w.offset = 0
w.digester.Hash().Reset()
if _, err := w.fp.Seek(0, io.SeekStart); err != nil {
return err
}
return w.fp.Truncate(0)
}