Avoid deadlock in unpacker.
Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
parent
a176179a08
commit
c560591627
5
pull.go
5
pull.go
@ -70,6 +70,11 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima
|
||||
}
|
||||
unpackWrapper, eg := u.handlerWrapper(ctx, &unpacks)
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
// Forcibly stop the unpacker if there is
|
||||
// an error.
|
||||
eg.Cancel()
|
||||
}
|
||||
if err := eg.Wait(); err != nil {
|
||||
if retErr == nil {
|
||||
retErr = errors.Wrap(err, "unpack")
|
||||
|
42
unpacker.go
42
unpacker.go
@ -186,8 +186,32 @@ func (u *unpacker) unpack(ctx context.Context, config ocispec.Descriptor, layers
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *unpacker) handlerWrapper(uctx context.Context, unpacks *int32) (func(images.Handler) images.Handler, *errgroup.Group) {
|
||||
eg, uctx := errgroup.WithContext(uctx)
|
||||
type errGroup struct {
|
||||
*errgroup.Group
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func newErrGroup(ctx context.Context) (*errGroup, context.Context) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
return &errGroup{
|
||||
Group: eg,
|
||||
cancel: cancel,
|
||||
}, ctx
|
||||
}
|
||||
|
||||
func (e *errGroup) Cancel() {
|
||||
e.cancel()
|
||||
}
|
||||
|
||||
func (e *errGroup) Wait() error {
|
||||
err := e.Group.Wait()
|
||||
e.cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
func (u *unpacker) handlerWrapper(uctx context.Context, unpacks *int32) (func(images.Handler) images.Handler, *errGroup) {
|
||||
eg, uctx := newErrGroup(uctx)
|
||||
return func(f images.Handler) images.Handler {
|
||||
var (
|
||||
lock sync.Mutex
|
||||
@ -234,7 +258,19 @@ func (u *unpacker) handlerWrapper(uctx context.Context, unpacks *int32) (func(im
|
||||
update := !schema1
|
||||
lock.Unlock()
|
||||
if update {
|
||||
u.updateCh <- desc
|
||||
select {
|
||||
case <-uctx.Done():
|
||||
// Do not send update if unpacker is not running.
|
||||
default:
|
||||
select {
|
||||
case u.updateCh <- desc:
|
||||
case <-uctx.Done():
|
||||
// Do not send update if unpacker is not running.
|
||||
}
|
||||
}
|
||||
// Checking ctx.Done() prevents the case that unpacker
|
||||
// exits unexpectedly, but update continues to be generated,
|
||||
// and eventually fills up updateCh and blocks forever.
|
||||
}
|
||||
}
|
||||
return children, nil
|
||||
|
Loading…
Reference in New Issue
Block a user