diff --git a/content/local/store.go b/content/local/store.go index 996724057..5503cb56f 100644 --- a/content/local/store.go +++ b/content/local/store.go @@ -33,6 +33,7 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/filters" "github.com/containerd/containerd/log" + "github.com/sirupsen/logrus" "github.com/containerd/continuity" digest "github.com/opencontainers/go-digest" @@ -477,6 +478,35 @@ func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content. return w, nil // lock is now held by w. } +func (s *store) resumeStatus(ref string, total int64, digester digest.Digester) (content.Status, error) { + path, _, data := s.ingestPaths(ref) + status, err := s.status(path) + if err != nil { + return status, errors.Wrap(err, "failed reading status of resume write") + } + if ref != status.Ref { + // NOTE(stevvooe): This is fairly catastrophic. Either we have some + // layout corruption or a hash collision for the ref key. + return status, errors.Wrapf(err, "ref key does not match: %v != %v", ref, status.Ref) + } + + if total > 0 && status.Total > 0 && total != status.Total { + return status, errors.Errorf("provided total differs from status: %v != %v", total, status.Total) + } + + // TODO(stevvooe): slow slow slow!!, send to goroutine or use resumable hashes + fp, err := os.Open(data) + if err != nil { + return status, err + } + + p := bufPool.Get().(*[]byte) + status.Offset, err = io.CopyBuffer(digester.Hash(), fp, *p) + bufPool.Put(p) + fp.Close() + return status, err +} + // writer provides the main implementation of the Writer method. The caller // must hold the lock correctly and release on error if there is a problem. func (s *store) writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) { @@ -498,45 +528,25 @@ func (s *store) writer(ctx context.Context, ref string, total int64, expected di updatedAt time.Time ) + foundValidIngest := false // ensure that the ingest path has been created. if err := os.Mkdir(path, 0755); err != nil { if !os.IsExist(err) { return nil, err } - - status, err := s.status(path) - if err != nil { - return nil, errors.Wrap(err, "failed reading status of resume write") + status, err := s.resumeStatus(ref, total, digester) + if err == nil { + foundValidIngest = true + updatedAt = status.UpdatedAt + startedAt = status.StartedAt + total = status.Total + offset = status.Offset + } else { + logrus.Infof("failed to resume the status from path %s: %s. will recreate them", path, err.Error()) } + } - if ref != status.Ref { - // NOTE(stevvooe): This is fairly catastrophic. Either we have some - // layout corruption or a hash collision for the ref key. - return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, status.Ref) - } - - if total > 0 && status.Total > 0 && total != status.Total { - return nil, errors.Errorf("provided total differs from status: %v != %v", total, status.Total) - } - - // TODO(stevvooe): slow slow slow!!, send to goroutine or use resumable hashes - fp, err := os.Open(data) - if err != nil { - return nil, err - } - - p := bufPool.Get().(*[]byte) - offset, err = io.CopyBuffer(digester.Hash(), fp, *p) - bufPool.Put(p) - fp.Close() - if err != nil { - return nil, err - } - - updatedAt = status.UpdatedAt - startedAt = status.StartedAt - total = status.Total - } else { + if !foundValidIngest { startedAt = time.Now() updatedAt = startedAt @@ -546,11 +556,11 @@ func (s *store) writer(ctx context.Context, ref string, total int64, expected di return nil, err } - if writeTimestampFile(filepath.Join(path, "startedat"), startedAt); err != nil { + if err := writeTimestampFile(filepath.Join(path, "startedat"), startedAt); err != nil { return nil, err } - if writeTimestampFile(filepath.Join(path, "updatedat"), startedAt); err != nil { + if err := writeTimestampFile(filepath.Join(path, "updatedat"), startedAt); err != nil { return nil, err }