Push client support
Update dist tool to use client package Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
parent
8ec5c30d83
commit
126aa07ad2
121
client.go
121
client.go
@ -7,6 +7,7 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd/api/services/containers"
|
"github.com/containerd/containerd/api/services/containers"
|
||||||
@ -29,6 +30,7 @@ import (
|
|||||||
protobuf "github.com/gogo/protobuf/types"
|
protobuf "github.com/gogo/protobuf/types"
|
||||||
"github.com/opencontainers/image-spec/identity"
|
"github.com/opencontainers/image-spec/identity"
|
||||||
"github.com/opencontainers/image-spec/specs-go/v1"
|
"github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
specs "github.com/opencontainers/runtime-spec/specs-go"
|
specs "github.com/opencontainers/runtime-spec/specs-go"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
@ -201,22 +203,42 @@ func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec,
|
|||||||
return containerFromProto(c, r.Container), nil
|
return containerFromProto(c, r.Container), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type PullOpts func(*Client, *PullContext) error
|
type RemoteOpts func(*Client, *RemoteContext) error
|
||||||
|
|
||||||
type PullContext struct {
|
// RemoteContext is used to configure object resolutions and transfers with
|
||||||
|
// remote content stores and image providers.
|
||||||
|
type RemoteContext struct {
|
||||||
|
// Resolver is used to resolve names to objects, fetchers, and pushers.
|
||||||
|
// If no resolver is provided, defaults to Docker registry resolver.
|
||||||
Resolver remotes.Resolver
|
Resolver remotes.Resolver
|
||||||
|
|
||||||
|
// Unpacker is used after an image is pulled to extract into a registry.
|
||||||
|
// If an image is not unpacked on pull, it can be unpacked any time
|
||||||
|
// afterwards. Unpacking is required to run an image.
|
||||||
Unpacker Unpacker
|
Unpacker Unpacker
|
||||||
|
|
||||||
|
// PushWrapper allows hooking into the push method. This can be used
|
||||||
|
// track content that is being sent to the remote.
|
||||||
|
PushWrapper func(remotes.Pusher) remotes.Pusher
|
||||||
|
|
||||||
|
// BaseHandlers are a set of handlers which get are called on dispatch.
|
||||||
|
// These handlers always get called before any operation specific
|
||||||
|
// handlers.
|
||||||
|
BaseHandlers []images.Handler
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultPullContext() *PullContext {
|
func defaultRemoteContext() *RemoteContext {
|
||||||
return &PullContext{
|
return &RemoteContext{
|
||||||
Resolver: docker.NewResolver(docker.ResolverOptions{
|
Resolver: docker.NewResolver(docker.ResolverOptions{
|
||||||
Client: http.DefaultClient,
|
Client: http.DefaultClient,
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithPullUnpack(client *Client, c *PullContext) error {
|
// WithPullUnpack is used to unpack an image after pull. This
|
||||||
|
// uses the snapshotter, content store, and diff service
|
||||||
|
// configured for the client.
|
||||||
|
func WithPullUnpack(client *Client, c *RemoteContext) error {
|
||||||
c.Unpacker = &snapshotUnpacker{
|
c.Unpacker = &snapshotUnpacker{
|
||||||
store: client.ContentStore(),
|
store: client.ContentStore(),
|
||||||
diff: client.DiffService(),
|
diff: client.DiffService(),
|
||||||
@ -225,6 +247,31 @@ func WithPullUnpack(client *Client, c *PullContext) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithResolver specifies the resolver to use.
|
||||||
|
func WithResolver(resolver remotes.Resolver) RemoteOpts {
|
||||||
|
return func(client *Client, c *RemoteContext) error {
|
||||||
|
c.Resolver = resolver
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithImageHandler adds a base handler to be called on dispatch.
|
||||||
|
func WithImageHandler(h images.Handler) RemoteOpts {
|
||||||
|
return func(client *Client, c *RemoteContext) error {
|
||||||
|
c.BaseHandlers = append(c.BaseHandlers, h)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithPushWrapper is used to wrap a pusher to hook into
|
||||||
|
// the push content as it is sent to a remote.
|
||||||
|
func WithPushWrapper(w func(remotes.Pusher) remotes.Pusher) RemoteOpts {
|
||||||
|
return func(client *Client, c *RemoteContext) error {
|
||||||
|
c.PushWrapper = w
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type Unpacker interface {
|
type Unpacker interface {
|
||||||
Unpack(context.Context, images.Image) error
|
Unpack(context.Context, images.Image) error
|
||||||
}
|
}
|
||||||
@ -274,8 +321,8 @@ func (s *snapshotUnpacker) getLayers(ctx context.Context, image images.Image) ([
|
|||||||
return layers, nil
|
return layers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Pull(ctx context.Context, ref string, opts ...PullOpts) (Image, error) {
|
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpts) (Image, error) {
|
||||||
pullCtx := defaultPullContext()
|
pullCtx := defaultRemoteContext()
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
if err := o(c, pullCtx); err != nil {
|
if err := o(c, pullCtx); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -292,10 +339,10 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...PullOpts) (Image,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
handlers := []images.Handler{
|
handlers := append(pullCtx.BaseHandlers,
|
||||||
remotes.FetchHandler(store, fetcher),
|
remotes.FetchHandler(store, fetcher),
|
||||||
images.ChildrenHandler(store),
|
images.ChildrenHandler(store),
|
||||||
}
|
)
|
||||||
if err := images.Dispatch(ctx, images.Handlers(handlers...), desc); err != nil {
|
if err := images.Dispatch(ctx, images.Handlers(handlers...), desc); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -330,6 +377,62 @@ func (c *Client) GetImage(ctx context.Context, ref string) (Image, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, opts ...RemoteOpts) error {
|
||||||
|
pushCtx := defaultRemoteContext()
|
||||||
|
for _, o := range opts {
|
||||||
|
if err := o(c, pushCtx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pusher, err := pushCtx.Resolver.Pusher(ctx, ref)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if pushCtx.PushWrapper != nil {
|
||||||
|
pusher = pushCtx.PushWrapper(pusher)
|
||||||
|
}
|
||||||
|
|
||||||
|
var m sync.Mutex
|
||||||
|
manifestStack := []ocispec.Descriptor{}
|
||||||
|
|
||||||
|
filterHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||||
|
switch desc.MediaType {
|
||||||
|
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest,
|
||||||
|
images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
|
||||||
|
m.Lock()
|
||||||
|
manifestStack = append(manifestStack, desc)
|
||||||
|
m.Unlock()
|
||||||
|
return nil, images.StopHandler
|
||||||
|
default:
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
cs := c.ContentStore()
|
||||||
|
pushHandler := remotes.PushHandler(cs, pusher)
|
||||||
|
|
||||||
|
handlers := append(pushCtx.BaseHandlers,
|
||||||
|
images.ChildrenHandler(cs),
|
||||||
|
filterHandler,
|
||||||
|
pushHandler,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := images.Dispatch(ctx, images.Handlers(handlers...), desc); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate in reverse order as seen, parent always uploaded after child
|
||||||
|
for i := len(manifestStack) - 1; i >= 0; i-- {
|
||||||
|
_, err := pushHandler(ctx, manifestStack[i])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Close closes the clients connection to containerd
|
// Close closes the clients connection to containerd
|
||||||
func (c *Client) Close() error {
|
func (c *Client) Close() error {
|
||||||
return c.conn.Close()
|
return c.conn.Close()
|
||||||
|
8
cmd/dist/common.go
vendored
8
cmd/dist/common.go
vendored
@ -13,6 +13,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/console"
|
"github.com/containerd/console"
|
||||||
|
"github.com/containerd/containerd"
|
||||||
contentapi "github.com/containerd/containerd/api/services/content"
|
contentapi "github.com/containerd/containerd/api/services/content"
|
||||||
imagesapi "github.com/containerd/containerd/api/services/images"
|
imagesapi "github.com/containerd/containerd/api/services/images"
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
@ -47,6 +48,13 @@ var registryFlags = []cli.Flag{
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getClient(context *cli.Context) (*containerd.Client, error) {
|
||||||
|
address := context.GlobalString("address")
|
||||||
|
//timeout := context.GlobalDuration("connect-timeout")
|
||||||
|
|
||||||
|
return containerd.New(address)
|
||||||
|
}
|
||||||
|
|
||||||
func resolveContentStore(context *cli.Context) (content.Store, error) {
|
func resolveContentStore(context *cli.Context) (content.Store, error) {
|
||||||
root := filepath.Join(context.GlobalString("root"), "content")
|
root := filepath.Join(context.GlobalString("root"), "content")
|
||||||
if !filepath.IsAbs(root) {
|
if !filepath.IsAbs(root) {
|
||||||
|
53
cmd/dist/push.go
vendored
53
cmd/dist/push.go
vendored
@ -8,6 +8,7 @@ import (
|
|||||||
"text/tabwriter"
|
"text/tabwriter"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd"
|
||||||
"github.com/containerd/containerd/images"
|
"github.com/containerd/containerd/images"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
"github.com/containerd/containerd/progress"
|
"github.com/containerd/containerd/progress"
|
||||||
@ -50,12 +51,7 @@ var pushCommand = cli.Command{
|
|||||||
ctx, cancel := appContext()
|
ctx, cancel := appContext()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
cs, err := resolveContentStore(clicontext)
|
client, err := getClient(clicontext)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
imageStore, err := resolveImageStore(clicontext)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -70,7 +66,7 @@ var pushCommand = cli.Command{
|
|||||||
if local == "" {
|
if local == "" {
|
||||||
local = ref
|
local = ref
|
||||||
}
|
}
|
||||||
img, err := imageStore.Get(ctx, local)
|
img, err := client.ImageService().Get(ctx, local)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "unable to resolve image to manifest")
|
return errors.Wrap(err, "unable to resolve image to manifest")
|
||||||
}
|
}
|
||||||
@ -86,11 +82,6 @@ var pushCommand = cli.Command{
|
|||||||
eg, ctx := errgroup.WithContext(ctx)
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
pusher, err := resolver.Pusher(ctx, ref)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.G(ctx).WithField("image", ref).WithField("digest", desc.Digest).Debug("pushing")
|
log.G(ctx).WithField("image", ref).WithField("digest", desc.Digest).Debug("pushing")
|
||||||
|
|
||||||
jobHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
jobHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||||
@ -98,39 +89,11 @@ var pushCommand = cli.Command{
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
pushHandler := remotes.PushHandler(cs, ongoing.wrapPusher(pusher))
|
return client.Push(ctx, ref, desc,
|
||||||
|
containerd.WithResolver(resolver),
|
||||||
var m sync.Mutex
|
containerd.WithImageHandler(jobHandler),
|
||||||
manifestStack := []ocispec.Descriptor{}
|
containerd.WithPushWrapper(ongoing.wrapPusher),
|
||||||
|
)
|
||||||
filterHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
|
||||||
switch desc.MediaType {
|
|
||||||
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest,
|
|
||||||
images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
|
|
||||||
m.Lock()
|
|
||||||
manifestStack = append(manifestStack, desc)
|
|
||||||
m.Unlock()
|
|
||||||
return nil, images.StopHandler
|
|
||||||
default:
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
handler := images.Handlers(jobHandler, images.ChildrenHandler(cs), filterHandler, pushHandler)
|
|
||||||
|
|
||||||
if err := images.Dispatch(ctx, handler, desc); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Iterate in reverse order as seen, parent always uploaded after child
|
|
||||||
for i := len(manifestStack) - 1; i >= 0; i-- {
|
|
||||||
_, err := pushHandler(ctx, manifestStack[i])
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
errs := make(chan error)
|
errs := make(chan error)
|
||||||
|
Loading…
Reference in New Issue
Block a user