Use image.IsUnpacked

fix #361
Signed-off-by: yanxuean <yan.xuean@zte.com.cn>
This commit is contained in:
yanxuean
2017-11-01 11:26:33 +08:00
parent b27a4c1723
commit 9027a02e8e
27 changed files with 174 additions and 256 deletions

View File

@@ -205,4 +205,4 @@ is released under the Apache 2.0 license. The README.md file, and files in the
"docs" folder are licensed under the Creative Commons Attribution 4.0
International License under the terms and conditions set forth in the file
"LICENSE.docs". You may obtain a duplicate copy of the same license, titled
CC-BY-SA-4.0, at http://creativecommons.org/licenses/by/4.0/.
CC-BY-4.0, at http://creativecommons.org/licenses/by/4.0/.

View File

@@ -24,6 +24,7 @@ func dialer(address string, timeout time.Duration) (net.Conn, error) {
return winio.DialPipe(address, &timeout)
}
// DialAddress returns the dial address
func DialAddress(address string) string {
return address
}

View File

@@ -10,7 +10,7 @@ import (
"sync"
)
// Resourcetype represents type of resource at a node
// ResourceType represents type of resource at a node
type ResourceType uint8
// Node presents a resource which has a type and key,
@@ -145,10 +145,10 @@ func ConcurrentMark(ctx context.Context, root <-chan Node, refs func(context.Con
// Sweep removes all nodes returned through the channel which are not in
// the reachable set by calling the provided remove function.
func Sweep(reachable map[Node]struct{}, all <-chan Node, remove func(Node) error) error {
func Sweep(reachable map[Node]struct{}, all []Node, remove func(Node) error) error {
// All black objects are now reachable, and all white objects are
// unreachable. Free those that are white!
for node := range all {
for _, node := range all {
if _, ok := reachable[node]; !ok {
if err := remove(node); err != nil {
return err

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/rootfs"
@@ -30,6 +31,8 @@ type Image interface {
Size(ctx context.Context) (int64, error)
// Config descriptor for the image.
Config(ctx context.Context) (ocispec.Descriptor, error)
// IsUnpacked returns whether or not an image is unpacked.
IsUnpacked(context.Context, string) (bool, error)
}
var _ = (Image)(&image{})
@@ -63,6 +66,26 @@ func (i *image) Config(ctx context.Context) (ocispec.Descriptor, error) {
return i.i.Config(ctx, provider, platforms.Default())
}
func (i *image) IsUnpacked(ctx context.Context, snapshotterName string) (bool, error) {
sn := i.client.SnapshotService(snapshotterName)
cs := i.client.ContentStore()
diffs, err := i.i.RootFS(ctx, cs, platforms.Default())
if err != nil {
return false, err
}
chainID := identity.ChainID(diffs)
_, err = sn.Stat(ctx, chainID.String())
if err == nil {
return true, nil
} else if !errdefs.IsNotFound(err) {
return false, err
}
return false, nil
}
func (i *image) Unpack(ctx context.Context, snapshotterName string) error {
layers, err := i.getLayers(ctx, platforms.Default())
if err != nil {

View File

@@ -190,6 +190,7 @@ func (m *DB) Update(fn func(*bolt.Tx) error) error {
return m.db.Update(fn)
}
// GarbageCollect starts garbage collection
func (m *DB) GarbageCollect(ctx context.Context) error {
lt1 := time.Now()
m.wlock.Lock()
@@ -198,39 +199,8 @@ func (m *DB) GarbageCollect(ctx context.Context) error {
log.G(ctx).WithField("d", time.Now().Sub(lt1)).Debug("metadata garbage collected")
}()
var marked map[gc.Node]struct{}
if err := m.db.View(func(tx *bolt.Tx) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
roots := make(chan gc.Node)
errChan := make(chan error)
go func() {
defer close(errChan)
defer close(roots)
// Call roots
if err := scanRoots(ctx, tx, roots); err != nil {
cancel()
errChan <- err
}
}()
refs := func(ctx context.Context, n gc.Node, fn func(gc.Node)) error {
return references(ctx, tx, n, fn)
}
reachable, err := gc.ConcurrentMark(ctx, roots, refs)
if rerr := <-errChan; rerr != nil {
return rerr
}
if err != nil {
return err
}
marked = reachable
return nil
}); err != nil {
marked, err := m.getMarked(ctx)
if err != nil {
return err
}
@@ -241,15 +211,11 @@ func (m *DB) GarbageCollect(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
nodeC := make(chan gc.Node)
var scanErr error
rm := func(ctx context.Context, n gc.Node) error {
if _, ok := marked[n]; ok {
return nil
}
go func() {
defer close(nodeC)
scanErr = scanAll(ctx, tx, nodeC)
}()
rm := func(n gc.Node) error {
if n.Type == ResourceSnapshot {
if idx := strings.IndexRune(n.Key, '/'); idx > 0 {
m.dirtySS[n.Key[:idx]] = struct{}{}
@@ -260,12 +226,8 @@ func (m *DB) GarbageCollect(ctx context.Context) error {
return remove(ctx, tx, n)
}
if err := gc.Sweep(marked, nodeC, rm); err != nil {
return errors.Wrap(err, "failed to sweep")
}
if scanErr != nil {
return errors.Wrap(scanErr, "failed to scan all")
if err := scanAll(ctx, tx, rm); err != nil {
return errors.Wrap(err, "failed to scan and remove")
}
return nil
@@ -292,6 +254,54 @@ func (m *DB) GarbageCollect(ctx context.Context) error {
return nil
}
func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) {
var marked map[gc.Node]struct{}
if err := m.db.View(func(tx *bolt.Tx) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var (
nodes []gc.Node
wg sync.WaitGroup
roots = make(chan gc.Node)
)
wg.Add(1)
go func() {
defer wg.Done()
for n := range roots {
nodes = append(nodes, n)
}
}()
// Call roots
if err := scanRoots(ctx, tx, roots); err != nil {
cancel()
return err
}
close(roots)
wg.Wait()
refs := func(n gc.Node) ([]gc.Node, error) {
var sn []gc.Node
if err := references(ctx, tx, n, func(nn gc.Node) {
sn = append(sn, nn)
}); err != nil {
return nil, err
}
return sn, nil
}
reachable, err := gc.Tricolor(nodes, refs)
if err != nil {
return err
}
marked = reachable
return nil
}); err != nil {
return nil, err
}
return marked, nil
}
func (m *DB) cleanupSnapshotter(name string) {
ctx := context.Background()
sn, ok := m.ss[name]

View File

@@ -12,10 +12,15 @@ import (
)
const (
// ResourceUnknown specifies an unknown resource
ResourceUnknown gc.ResourceType = iota
// ResourceContent specifies a content resource
ResourceContent
// ResourceSnapshot specifies a snapshot resource
ResourceSnapshot
// ResourceContainer specifies a container resource
ResourceContainer
// ResourceTask specifies a task resource
ResourceTask
)
@@ -174,7 +179,7 @@ func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)
return nil
}
func scanAll(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
func scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx context.Context, n gc.Node) error) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
@@ -201,12 +206,8 @@ func scanAll(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
if v != nil {
return nil
}
select {
case nc <- gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k)):
case <-ctx.Done():
return ctx.Err()
}
return nil
node := gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k))
return fn(ctx, node)
})
}); err != nil {
return err
@@ -222,12 +223,8 @@ func scanAll(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
if v != nil {
return nil
}
select {
case nc <- gcnode(ResourceContent, ns, string(k)):
case <-ctx.Done():
return ctx.Err()
}
return nil
node := gcnode(ResourceContent, ns, string(k))
return fn(ctx, node)
}); err != nil {
return err
}

View File

@@ -1,83 +0,0 @@
package mount
// On Solaris we can't invoke the mount system call directly. First,
// the mount system call takes more than 6 arguments, and go doesn't
// support invoking system calls that take more than 6 arguments. Past
// that, the mount system call is a private interfaces. For example,
// the arguments and data structures passed to the kernel to create an
// nfs mount are private and can change at any time. The only public
// and stable interface for creating mounts on Solaris is the mount.8
// command, so we'll invoke that here.
import (
"bytes"
"errors"
"fmt"
"os/exec"
"strings"
"golang.org/x/sys/unix"
)
const (
mountCmd = "/usr/sbin/mount"
)
func doMount(arg ...string) error {
cmd := exec.Command(mountCmd, arg...)
/* Setup Stdin, Stdout, and Stderr */
stderr := new(bytes.Buffer)
cmd.Stdin = nil
cmd.Stdout = nil
cmd.Stderr = stderr
/*
* Run the command. If the command fails create a new error
* object to return that includes stderr output.
*/
err := cmd.Start()
if err != nil {
return err
}
err = cmd.Wait()
if err != nil {
return errors.New(fmt.Sprintf("%v: %s", err, stderr.String()))
}
return nil
}
func (m *Mount) Mount(target string) error {
var err error
if len(m.Options) == 0 {
err = doMount("-F", m.Type, m.Source, target)
} else {
err = doMount("-F", m.Type, "-o", strings.Join(m.Options, ","),
m.Source, target)
}
return err
}
func Unmount(mount string, flags int) error {
return unix.Unmount(mount, flags)
}
// UnmountAll repeatedly unmounts the given mount point until there
// are no mounts remaining (EINVAL is returned by mount), which is
// useful for undoing a stack of mounts on the same mount point.
func UnmountAll(mount string, flags int) error {
for {
if err := Unmount(mount, flags); err != nil {
// EINVAL is returned if the target is not a
// mount point, indicating that we are
// done. It can also indicate a few other
// things (such as invalid flags) which we
// unfortunately end up squelching here too.
if err == unix.EINVAL {
return nil
}
return err
}
}
}

View File

@@ -3,17 +3,21 @@ package mount
import "github.com/pkg/errors"
var (
// ErrNotImplementOnWindows is returned when an action is not implemented for windows
ErrNotImplementOnWindows = errors.New("not implemented under windows")
)
// Mount to the provided target
func (m *Mount) Mount(target string) error {
return ErrNotImplementOnWindows
}
// Unmount the mount at the provided path
func Unmount(mount string, flags int) error {
return ErrNotImplementOnWindows
}
// UnmountAll mounts at the provided path
func UnmountAll(mount string, flags int) error {
return ErrNotImplementOnWindows
}

View File

@@ -1,50 +0,0 @@
// +build solaris,cgo
package mount
/*
#include <stdio.h>
#include <stdlib.h>
#include <sys/mnttab.h>
*/
import "C"
import (
"fmt"
"unsafe"
)
// Self retrieves a list of mounts for the current running process.
func Self() ([]Info, error) {
path := C.CString(C.MNTTAB)
defer C.free(unsafe.Pointer(path))
mode := C.CString("r")
defer C.free(unsafe.Pointer(mode))
mnttab := C.fopen(path, mode)
if mnttab == nil {
return nil, fmt.Errorf("Failed to open %s", C.MNTTAB)
}
var out []Info
var mp C.struct_mnttab
ret := C.getmntent(mnttab, &mp)
for ret == 0 {
var mountinfo Info
mountinfo.Mountpoint = C.GoString(mp.mnt_mountp)
mountinfo.Source = C.GoString(mp.mnt_special)
mountinfo.FSType = C.GoString(mp.mnt_fstype)
mountinfo.Options = C.GoString(mp.mnt_mntopts)
out = append(out, mountinfo)
ret = C.getmntent(mnttab, &mp)
}
C.fclose(mnttab)
return out, nil
}
// PID collects the mounts for a specific process ID.
func PID(pid int) ([]Info, error) {
return nil, fmt.Errorf("mountinfo.PID is not implemented on solaris")
}

View File

@@ -22,11 +22,11 @@ type InitContext struct {
Meta *Meta // plugins can fill in metadata at init.
plugins *PluginSet
plugins *Set
}
// NewContext returns a new plugin InitContext
func NewContext(ctx context.Context, r *Registration, plugins *PluginSet, root, state string) *InitContext {
func NewContext(ctx context.Context, r *Registration, plugins *Set, root, state string) *InitContext {
return &InitContext{
Context: log.WithModule(ctx, r.URI()),
Root: filepath.Join(root, r.URI()),
@@ -72,26 +72,26 @@ func (p *Plugin) Instance() (interface{}, error) {
return p.instance, p.err
}
// PluginSet defines a plugin collection, used with InitContext.
// Set defines a plugin collection, used with InitContext.
//
// This maintains ordering and unique indexing over the set.
//
// After iteratively instantiating plugins, this set should represent, the
// ordered, initialization set of plugins for a containerd instance.
type PluginSet struct {
type Set struct {
ordered []*Plugin // order of initialization
byTypeAndID map[Type]map[string]*Plugin
}
// NewPluginSet returns an initialized plugin set
func NewPluginSet() *PluginSet {
return &PluginSet{
func NewPluginSet() *Set {
return &Set{
byTypeAndID: make(map[Type]map[string]*Plugin),
}
}
// Add a plugin to the set
func (ps *PluginSet) Add(p *Plugin) error {
func (ps *Set) Add(p *Plugin) error {
if byID, typeok := ps.byTypeAndID[p.Registration.Type]; !typeok {
ps.byTypeAndID[p.Registration.Type] = map[string]*Plugin{
p.Registration.ID: p,
@@ -107,7 +107,7 @@ func (ps *PluginSet) Add(p *Plugin) error {
}
// Get returns the first plugin by its type
func (ps *PluginSet) Get(t Type) (interface{}, error) {
func (ps *Set) Get(t Type) (interface{}, error) {
for _, v := range ps.byTypeAndID[t] {
return v.Instance()
}

View File

@@ -58,9 +58,13 @@ const (
// Registration contains information for registering a plugin
type Registration struct {
Type Type
ID string
Config interface{}
// Type of the plugin
Type Type
// ID of the plugin
ID string
// Config specific to the plugin
Config interface{}
// Requires is a list of plugins that the registered plugin requires to be available
Requires []Type
// InitFn is called when initializing a plugin. The registration and
@@ -69,6 +73,7 @@ type Registration struct {
InitFn func(*InitContext) (interface{}, error)
}
// Init the registered plugin
func (r *Registration) Init(ic *InitContext) *Plugin {
p, err := r.InitFn(ic)
return &Plugin{

View File

@@ -9,6 +9,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"strings"
"sync"
"time"
@@ -215,13 +216,26 @@ func (c *Converter) fetchManifest(ctx context.Context, desc ocispec.Descriptor)
func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) error {
log.G(ctx).Debug("fetch blob")
ref := remotes.MakeRefKey(ctx, desc)
calc := newBlobStateCalculator()
var (
ref = remotes.MakeRefKey(ctx, desc)
calc = newBlobStateCalculator()
retry = 16
)
tryit:
cw, err := c.contentStore.Writer(ctx, ref, desc.Size, desc.Digest)
if err != nil {
if !errdefs.IsAlreadyExists(err) {
if errdefs.IsUnavailable(err) {
select {
case <-time.After(time.Millisecond * time.Duration(rand.Intn(retry))):
if retry < 2048 {
retry = retry << 1
}
goto tryit
case <-ctx.Done():
return err
}
} else if !errdefs.IsAlreadyExists(err) {
return err
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"math/rand"
"time"
"github.com/containerd/containerd/content"
@@ -84,7 +85,7 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc
// of writer and abort if not updated recently.
select {
case <-time.After(time.Millisecond * time.Duration(retry)):
case <-time.After(time.Millisecond * time.Duration(rand.Intn(retry))):
if retry < 2048 {
retry = retry << 1
}

View File

@@ -44,6 +44,6 @@ func (ra *remoteReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
return n, nil
}
func (rr *remoteReaderAt) Close() error {
func (ra *remoteReaderAt) Close() error {
return nil
}

View File

@@ -15,6 +15,7 @@ type remoteStore struct {
client contentapi.ContentClient
}
// NewStoreFromClient returns a new content store
func NewStoreFromClient(client contentapi.ContentClient) content.Store {
return &remoteStore{
client: client,

View File

@@ -9,7 +9,7 @@ import (
"golang.org/x/net/context"
)
// NewApplierFromClient returns a new Applier which communicates
// NewDiffServiceFromClient returns a new diff service which communicates
// over a GRPC connection.
func NewDiffServiceFromClient(client diffapi.DiffClient) diff.Differ {
return &remote{

View File

@@ -13,6 +13,7 @@ type remoteStore struct {
client imagesapi.ImagesClient
}
// NewStoreFromClient returns a new image store client
func NewStoreFromClient(client imagesapi.ImagesClient) images.Store {
return &remoteStore{
client: client,

View File

@@ -10,6 +10,7 @@ import (
"github.com/gogo/protobuf/types"
)
// NewStoreFromClient returns a new namespace store
func NewStoreFromClient(client api.NamespacesClient) namespaces.Store {
return &remote{client: client}
}

View File

@@ -20,6 +20,9 @@ const (
KindCommitted
)
// ParseKind parses the provided string into a Kind
//
// If the string cannot be parsed KindUnknown is returned
func ParseKind(s string) Kind {
s = strings.ToLower(s)
switch s {
@@ -34,6 +37,7 @@ func ParseKind(s string) Kind {
return KindUnknown
}
// String returns the string representation of the Kind
func (k Kind) String() string {
switch k {
case KindView:
@@ -47,10 +51,12 @@ func (k Kind) String() string {
return "Unknown"
}
// MarshalJSON the Kind to JSON
func (k Kind) MarshalJSON() ([]byte, error) {
return json.Marshal(k.String())
}
// UnmarshalJSON the Kind from JSON
func (k *Kind) UnmarshalJSON(b []byte) error {
var s string
if err := json.Unmarshal(b, &s); err != nil {
@@ -81,6 +87,7 @@ type Usage struct {
Size int64 // provides usage, in bytes, of snapshot
}
// Add the provided usage to the current usage
func (u *Usage) Add(other Usage) {
u.Size += other.Size

View File

@@ -15,6 +15,7 @@ import (
specs "github.com/opencontainers/runtime-spec/specs-go"
)
// WithImageConfig configures the spec to from the configuration of an Image
func WithImageConfig(i Image) SpecOpts {
return func(ctx context.Context, client *Client, _ *containers.Container, s *specs.Spec) error {
var (
@@ -51,6 +52,8 @@ func WithImageConfig(i Image) SpecOpts {
}
}
// WithTTY sets the information on the spec as well as the environment variables for
// using a TTY
func WithTTY(width, height int) SpecOpts {
return func(_ context.Context, _ *Client, _ *containers.Container, s *specs.Spec) error {
s.Process.Terminal = true
@@ -63,6 +66,7 @@ func WithTTY(width, height int) SpecOpts {
}
}
// WithResources sets the provided resources on the spec for task updates
func WithResources(resources *specs.WindowsResources) UpdateTaskOpts {
return func(ctx context.Context, client *Client, r *UpdateTaskInfo) error {
r.Resources = resources

View File

@@ -1,5 +1,8 @@
package sys
// SetOOMScore sets the oom score for the process
//
// Not implemented on Windows
func SetOOMScore(pid, score int) error {
return nil
}

View File

@@ -1,19 +0,0 @@
// +build solaris
package sys
import (
"errors"
)
//Solaris TODO
// GetSubreaper returns the subreaper setting for the calling process
func GetSubreaper() (int, error) {
return 0, errors.New("osutils GetSubreaper not implemented on Solaris")
}
// SetSubreaper sets the value i as the subreaper setting for the calling process
func SetSubreaper(i int) error {
return errors.New("osutils SetSubreaper not implemented on Solaris")
}

View File

@@ -6,14 +6,17 @@ import (
"syscall"
)
// StatAtime returns the Atim
func StatAtime(st *syscall.Stat_t) syscall.Timespec {
return st.Atim
}
// StatCtime returns the Ctim
func StatCtime(st *syscall.Stat_t) syscall.Timespec {
return st.Ctim
}
// StatMtime returns the Mtim
func StatMtime(st *syscall.Stat_t) syscall.Timespec {
return st.Mtim
}

View File

@@ -23,7 +23,7 @@ github.com/stretchr/testify v1.1.4
github.com/davecgh/go-spew v1.1.0
github.com/pmezard/go-difflib v1.0.0
github.com/containerd/fifo fbfb6a11ec671efbe94ad1c12c2e98773f19e1e6
github.com/urfave/cli 8ba6f23b6e36d03666a14bd9421f5e3efcb59aca
github.com/urfave/cli 7bc6a0acffa589f415f88aca16cc1de5ffd66f9c
golang.org/x/net 7dcfb8076726a3fdd9353b6b8a1f1b6be6811bd6
google.golang.org/grpc v1.3.0
github.com/pkg/errors v0.8.0