Fix PushHandler cannot push image that contains duplicated blobs

Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
This commit is contained in:
ktock
2021-04-16 15:07:44 +09:00
parent f0890f9b3a
commit ab1654d0e2
11 changed files with 445 additions and 5 deletions

View File

@@ -44,17 +44,47 @@ type dockerPusher struct {
tracker StatusTracker
}
// Writer implements Ingester API of content store. This allows the client
// to receive ErrUnavailable when there is already an on-going upload.
// Note that the tracker MUST implement StatusTrackLocker interface to avoid
// race condition on StatusTracker.
func (p dockerPusher) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
var wOpts content.WriterOpts
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil, err
}
}
if wOpts.Ref == "" {
return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
}
return p.push(ctx, wOpts.Desc, wOpts.Ref, true)
}
func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (content.Writer, error) {
return p.push(ctx, desc, remotes.MakeRefKey(ctx, desc), false)
}
func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref string, unavailableOnFail bool) (content.Writer, error) {
if l, ok := p.tracker.(StatusTrackLocker); ok {
l.Lock(ref)
defer l.Unlock(ref)
}
ctx, err := ContextWithRepositoryScope(ctx, p.refspec, true)
if err != nil {
return nil, err
}
ref := remotes.MakeRefKey(ctx, desc)
status, err := p.tracker.GetStatus(ref)
if err == nil {
if status.Committed && status.Offset == status.Total {
return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "ref %v", ref)
}
if unavailableOnFail {
// Another push of this ref is happening elsewhere. The rest of function
// will continue only when `errdefs.IsNotFound(err) == true` (i.e. there
// is no actively-tracked ref already).
return nil, errors.Wrap(errdefs.ErrUnavailable, "push is on-going")
}
// TODO: Handle incomplete status
} else if !errdefs.IsNotFound(err) {
return nil, errors.Wrap(err, "failed to get status")
@@ -105,8 +135,11 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten
if exists {
p.tracker.SetStatus(ref, Status{
Committed: true,
Status: content.Status{
Ref: ref,
Ref: ref,
Total: desc.Size,
Offset: desc.Size,
// TODO: Set updated time?
},
})
@@ -162,8 +195,11 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten
case http.StatusOK, http.StatusAccepted, http.StatusNoContent:
case http.StatusCreated:
p.tracker.SetStatus(ref, Status{
Committed: true,
Status: content.Status{
Ref: ref,
Ref: ref,
Total: desc.Size,
Offset: desc.Size,
},
})
return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v on remote", desc.Digest)

View File

@@ -21,6 +21,7 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/moby/locker"
"github.com/pkg/errors"
)
@@ -40,15 +41,24 @@ type StatusTracker interface {
SetStatus(string, Status)
}
// StatusTrackLocker to track status of operations with lock
type StatusTrackLocker interface {
StatusTracker
Lock(string)
Unlock(string)
}
type memoryStatusTracker struct {
statuses map[string]Status
m sync.Mutex
locker *locker.Locker
}
// NewInMemoryTracker returns a StatusTracker that tracks content status in-memory
func NewInMemoryTracker() StatusTracker {
func NewInMemoryTracker() StatusTrackLocker {
return &memoryStatusTracker{
statuses: map[string]Status{},
locker: locker.New(),
}
}
@@ -67,3 +77,11 @@ func (t *memoryStatusTracker) SetStatus(ref string, status Status) {
t.statuses[ref] = status
t.m.Unlock()
}
func (t *memoryStatusTracker) Lock(ref string) {
t.locker.Lock(ref)
}
func (t *memoryStatusTracker) Unlock(ref string) {
t.locker.Unlock(ref)
}