Move content store implementation
Move the filesystem content store implementation to a subpackage of content. Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
parent
9b53b8b68d
commit
442365248b
@ -3,20 +3,11 @@ package content
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/opencontainers/go-digest"
|
"github.com/opencontainers/go-digest"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
bufPool = sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
return make([]byte, 1<<20)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
type Provider interface {
|
type Provider interface {
|
||||||
Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error)
|
Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error)
|
||||||
ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error)
|
ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error)
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package content
|
package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
@ -1,4 +1,4 @@
|
|||||||
package content
|
package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
@ -1,4 +1,4 @@
|
|||||||
package content
|
package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -8,8 +8,10 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/content"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/containerd/containerd/filters"
|
"github.com/containerd/containerd/filters"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
@ -17,6 +19,14 @@ import (
|
|||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
bufPool = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return make([]byte, 1<<20)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
// Store is digest-keyed store for content. All data written into the store is
|
// Store is digest-keyed store for content. All data written into the store is
|
||||||
// stored under a verifiable digest.
|
// stored under a verifiable digest.
|
||||||
//
|
//
|
||||||
@ -26,7 +36,7 @@ type store struct {
|
|||||||
root string
|
root string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStore(root string) (Store, error) {
|
func NewStore(root string) (content.Store, error) {
|
||||||
if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) {
|
if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -36,7 +46,7 @@ func NewStore(root string) (Store, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) Info(ctx context.Context, dgst digest.Digest) (Info, error) {
|
func (s *store) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
|
||||||
p := s.blobPath(dgst)
|
p := s.blobPath(dgst)
|
||||||
fi, err := os.Stat(p)
|
fi, err := os.Stat(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -44,14 +54,14 @@ func (s *store) Info(ctx context.Context, dgst digest.Digest) (Info, error) {
|
|||||||
err = errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
|
err = errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
|
||||||
}
|
}
|
||||||
|
|
||||||
return Info{}, err
|
return content.Info{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.info(dgst, fi), nil
|
return s.info(dgst, fi), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) info(dgst digest.Digest, fi os.FileInfo) Info {
|
func (s *store) info(dgst digest.Digest, fi os.FileInfo) content.Info {
|
||||||
return Info{
|
return content.Info{
|
||||||
Digest: dgst,
|
Digest: dgst,
|
||||||
Size: fi.Size(),
|
Size: fi.Size(),
|
||||||
CreatedAt: fi.ModTime(),
|
CreatedAt: fi.ModTime(),
|
||||||
@ -93,12 +103,12 @@ func (cs *store) Delete(ctx context.Context, dgst digest.Digest) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *store) Update(ctx context.Context, info Info, fieldpaths ...string) (Info, error) {
|
func (cs *store) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
|
||||||
// TODO: Support persisting and updating mutable content data
|
// TODO: Support persisting and updating mutable content data
|
||||||
return Info{}, errors.Wrapf(errdefs.ErrFailedPrecondition, "update not supported on immutable content store")
|
return content.Info{}, errors.Wrapf(errdefs.ErrFailedPrecondition, "update not supported on immutable content store")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *store) Walk(ctx context.Context, fn WalkFunc, filters ...string) error {
|
func (cs *store) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
|
||||||
// TODO: Support filters
|
// TODO: Support filters
|
||||||
root := filepath.Join(cs.root, "blobs")
|
root := filepath.Join(cs.root, "blobs")
|
||||||
var alg digest.Algorithm
|
var alg digest.Algorithm
|
||||||
@ -141,11 +151,11 @@ func (cs *store) Walk(ctx context.Context, fn WalkFunc, filters ...string) error
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) Status(ctx context.Context, ref string) (Status, error) {
|
func (s *store) Status(ctx context.Context, ref string) (content.Status, error) {
|
||||||
return s.status(s.ingestRoot(ref))
|
return s.status(s.ingestRoot(ref))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]Status, error) {
|
func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) {
|
||||||
fp, err := os.Open(filepath.Join(s.root, "ingest"))
|
fp, err := os.Open(filepath.Join(s.root, "ingest"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -163,7 +173,7 @@ func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]Status, error
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var active []Status
|
var active []content.Status
|
||||||
for _, fi := range fis {
|
for _, fi := range fis {
|
||||||
p := filepath.Join(s.root, "ingest", fi.Name())
|
p := filepath.Join(s.root, "ingest", fi.Name())
|
||||||
stat, err := s.status(p)
|
stat, err := s.status(p)
|
||||||
@ -192,19 +202,19 @@ func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]Status, error
|
|||||||
}
|
}
|
||||||
|
|
||||||
// status works like stat above except uses the path to the ingest.
|
// status works like stat above except uses the path to the ingest.
|
||||||
func (s *store) status(ingestPath string) (Status, error) {
|
func (s *store) status(ingestPath string) (content.Status, error) {
|
||||||
dp := filepath.Join(ingestPath, "data")
|
dp := filepath.Join(ingestPath, "data")
|
||||||
fi, err := os.Stat(dp)
|
fi, err := os.Stat(dp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Status{}, err
|
return content.Status{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ref, err := readFileString(filepath.Join(ingestPath, "ref"))
|
ref, err := readFileString(filepath.Join(ingestPath, "ref"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Status{}, err
|
return content.Status{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return Status{
|
return content.Status{
|
||||||
Ref: ref,
|
Ref: ref,
|
||||||
Offset: fi.Size(),
|
Offset: fi.Size(),
|
||||||
Total: s.total(ingestPath),
|
Total: s.total(ingestPath),
|
||||||
@ -213,7 +223,7 @@ func (s *store) status(ingestPath string) (Status, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func adaptStatus(status Status) filters.Adaptor {
|
func adaptStatus(status content.Status) filters.Adaptor {
|
||||||
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
|
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
|
||||||
if len(fieldpath) == 0 {
|
if len(fieldpath) == 0 {
|
||||||
return "", false
|
return "", false
|
||||||
@ -248,7 +258,7 @@ func (s *store) total(ingestPath string) int64 {
|
|||||||
// ref at a time.
|
// ref at a time.
|
||||||
//
|
//
|
||||||
// The argument `ref` is used to uniquely identify a long-lived writer transaction.
|
// The argument `ref` is used to uniquely identify a long-lived writer transaction.
|
||||||
func (s *store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (Writer, error) {
|
func (s *store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {
|
||||||
// TODO(stevvooe): Need to actually store expected here. We have
|
// TODO(stevvooe): Need to actually store expected here. We have
|
||||||
// code in the service that shouldn't be dealing with this.
|
// code in the service that shouldn't be dealing with this.
|
||||||
if expected != "" {
|
if expected != "" {
|
||||||
@ -384,3 +394,8 @@ func (s *store) ingestPaths(ref string) (string, string, string) {
|
|||||||
|
|
||||||
return fp, rp, dp
|
return fp, rp, dp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func readFileString(path string) (string, error) {
|
||||||
|
p, err := ioutil.ReadFile(path)
|
||||||
|
return string(p), err
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package content
|
package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
@ -1,6 +1,6 @@
|
|||||||
// +build darwin freebsd
|
// +build darwin freebsd
|
||||||
|
|
||||||
package content
|
package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
@ -1,4 +1,4 @@
|
|||||||
package content
|
package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
@ -1,4 +1,4 @@
|
|||||||
package content
|
package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
@ -6,6 +6,7 @@ import (
|
|||||||
"runtime"
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/content"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/opencontainers/go-digest"
|
"github.com/opencontainers/go-digest"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
@ -24,8 +25,8 @@ type writer struct {
|
|||||||
updatedAt time.Time
|
updatedAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *writer) Status() (Status, error) {
|
func (w *writer) Status() (content.Status, error) {
|
||||||
return Status{
|
return content.Status{
|
||||||
Ref: w.ref,
|
Ref: w.ref,
|
||||||
Offset: w.offset,
|
Offset: w.offset,
|
||||||
Total: w.total,
|
Total: w.total,
|
@ -5,12 +5,21 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/opencontainers/go-digest"
|
"github.com/opencontainers/go-digest"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
bufPool = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return make([]byte, 1<<20)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
// ReadBlob retrieves the entire contents of the blob from the provider.
|
// ReadBlob retrieves the entire contents of the blob from the provider.
|
||||||
//
|
//
|
||||||
// Avoid using this for large blobs, such as layers.
|
// Avoid using this for large blobs, such as layers.
|
||||||
@ -121,8 +130,3 @@ func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
|
|||||||
|
|
||||||
return r, errors.Wrapf(errUnseekable, "seek to offset %v failed", offset)
|
return r, errors.Wrapf(errUnseekable, "seek to offset %v failed", offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
func readFileString(path string) (string, error) {
|
|
||||||
p, err := ioutil.ReadFile(path)
|
|
||||||
return string(p), err
|
|
||||||
}
|
|
||||||
|
@ -19,7 +19,7 @@ import (
|
|||||||
snapshot "github.com/containerd/containerd/api/services/snapshot/v1"
|
snapshot "github.com/containerd/containerd/api/services/snapshot/v1"
|
||||||
tasks "github.com/containerd/containerd/api/services/tasks/v1"
|
tasks "github.com/containerd/containerd/api/services/tasks/v1"
|
||||||
version "github.com/containerd/containerd/api/services/version/v1"
|
version "github.com/containerd/containerd/api/services/version/v1"
|
||||||
store "github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content/fs"
|
||||||
"github.com/containerd/containerd/events"
|
"github.com/containerd/containerd/events"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
"github.com/containerd/containerd/plugin"
|
"github.com/containerd/containerd/plugin"
|
||||||
@ -161,7 +161,7 @@ func loadPlugins(config *Config) ([]*plugin.Registration, error) {
|
|||||||
Type: plugin.ContentPlugin,
|
Type: plugin.ContentPlugin,
|
||||||
ID: "content",
|
ID: "content",
|
||||||
Init: func(ic *plugin.InitContext) (interface{}, error) {
|
Init: func(ic *plugin.InitContext) (interface{}, error) {
|
||||||
return store.NewStore(ic.Root)
|
return fs.NewStore(ic.Root)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
plugin.Register(&plugin.Registration{
|
plugin.Register(&plugin.Registration{
|
||||||
|
Loading…
Reference in New Issue
Block a user