Update apply diff to support context cancellation

Allows cancellation of apply when the grpc service issues a cancel.
Adds a timing log for apply.

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan 2017-10-11 16:47:16 -07:00
parent 587f25245a
commit dcf7ff5db5
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
2 changed files with 59 additions and 4 deletions

View File

@ -107,6 +107,12 @@ func Apply(ctx context.Context, root string, r io.Reader) (int64, error) {
// Iterate through the files in the archive.
for {
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}
hdr, err := tr.Next()
if err == io.EOF {
// end of tar archive
@ -455,9 +461,8 @@ func createTarFile(ctx context.Context, path, extractDir string, hdr *tar.Header
if err != nil {
return err
}
buf := bufferPool.Get().([]byte)
_, err = io.CopyBuffer(file, reader, buf)
bufferPool.Put(buf)
_, err = copyBuffered(ctx, file, reader)
if err1 := file.Close(); err == nil {
err = err1
}
@ -524,3 +529,41 @@ func createTarFile(ctx context.Context, path, extractDir string, hdr *tar.Header
return chtimes(path, boundTime(latestTime(hdr.AccessTime, hdr.ModTime)), boundTime(hdr.ModTime))
}
func copyBuffered(ctx context.Context, dst io.Writer, src io.Reader) (written int64, err error) {
buf := bufferPool.Get().([]byte)
defer bufferPool.Put(buf)
for {
select {
case <-ctx.Done():
err = ctx.Err()
return
default:
}
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if nw > 0 {
written += int64(nw)
}
if ew != nil {
err = ew
break
}
if nr != nw {
err = io.ErrShortWrite
break
}
}
if er != nil {
if er != io.EOF {
err = er
}
break
}
}
return written, err
}

View File

@ -24,6 +24,7 @@ import (
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
@ -63,7 +64,18 @@ func NewWalkingDiff(store content.Store) (diff.Differ, error) {
// Apply applies the content associated with the provided digests onto the
// provided mounts. Archive content will be extracted and decompressed if
// necessary.
func (s *walkingDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts []mount.Mount) (ocispec.Descriptor, error) {
func (s *walkingDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts []mount.Mount) (d ocispec.Descriptor, err error) {
t1 := time.Now()
defer func() {
if err == nil {
log.G(ctx).WithFields(logrus.Fields{
"d": time.Now().Sub(t1),
"dgst": desc.Digest,
"size": desc.Size,
"media": desc.MediaType,
}).Debugf("diff applied")
}
}()
var isCompressed bool
switch desc.MediaType {
case ocispec.MediaTypeImageLayer, images.MediaTypeDockerSchema2Layer: