Merge pull request #3160 from sofat1989/imageresume
delete the ingest when any errors happen during resuming status
This commit is contained in:
commit
993fb310f6
@ -33,6 +33,7 @@ import (
|
|||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/containerd/containerd/filters"
|
"github.com/containerd/containerd/filters"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/containerd/continuity"
|
"github.com/containerd/continuity"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
@ -477,6 +478,35 @@ func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.
|
|||||||
return w, nil // lock is now held by w.
|
return w, nil // lock is now held by w.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *store) resumeStatus(ref string, total int64, digester digest.Digester) (content.Status, error) {
|
||||||
|
path, _, data := s.ingestPaths(ref)
|
||||||
|
status, err := s.status(path)
|
||||||
|
if err != nil {
|
||||||
|
return status, errors.Wrap(err, "failed reading status of resume write")
|
||||||
|
}
|
||||||
|
if ref != status.Ref {
|
||||||
|
// NOTE(stevvooe): This is fairly catastrophic. Either we have some
|
||||||
|
// layout corruption or a hash collision for the ref key.
|
||||||
|
return status, errors.Wrapf(err, "ref key does not match: %v != %v", ref, status.Ref)
|
||||||
|
}
|
||||||
|
|
||||||
|
if total > 0 && status.Total > 0 && total != status.Total {
|
||||||
|
return status, errors.Errorf("provided total differs from status: %v != %v", total, status.Total)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(stevvooe): slow slow slow!!, send to goroutine or use resumable hashes
|
||||||
|
fp, err := os.Open(data)
|
||||||
|
if err != nil {
|
||||||
|
return status, err
|
||||||
|
}
|
||||||
|
|
||||||
|
p := bufPool.Get().(*[]byte)
|
||||||
|
status.Offset, err = io.CopyBuffer(digester.Hash(), fp, *p)
|
||||||
|
bufPool.Put(p)
|
||||||
|
fp.Close()
|
||||||
|
return status, err
|
||||||
|
}
|
||||||
|
|
||||||
// writer provides the main implementation of the Writer method. The caller
|
// writer provides the main implementation of the Writer method. The caller
|
||||||
// must hold the lock correctly and release on error if there is a problem.
|
// must hold the lock correctly and release on error if there is a problem.
|
||||||
func (s *store) writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {
|
func (s *store) writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {
|
||||||
@ -498,45 +528,25 @@ func (s *store) writer(ctx context.Context, ref string, total int64, expected di
|
|||||||
updatedAt time.Time
|
updatedAt time.Time
|
||||||
)
|
)
|
||||||
|
|
||||||
|
foundValidIngest := false
|
||||||
// ensure that the ingest path has been created.
|
// ensure that the ingest path has been created.
|
||||||
if err := os.Mkdir(path, 0755); err != nil {
|
if err := os.Mkdir(path, 0755); err != nil {
|
||||||
if !os.IsExist(err) {
|
if !os.IsExist(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
status, err := s.resumeStatus(ref, total, digester)
|
||||||
status, err := s.status(path)
|
if err == nil {
|
||||||
if err != nil {
|
foundValidIngest = true
|
||||||
return nil, errors.Wrap(err, "failed reading status of resume write")
|
updatedAt = status.UpdatedAt
|
||||||
|
startedAt = status.StartedAt
|
||||||
|
total = status.Total
|
||||||
|
offset = status.Offset
|
||||||
|
} else {
|
||||||
|
logrus.Infof("failed to resume the status from path %s: %s. will recreate them", path, err.Error())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if ref != status.Ref {
|
if !foundValidIngest {
|
||||||
// NOTE(stevvooe): This is fairly catastrophic. Either we have some
|
|
||||||
// layout corruption or a hash collision for the ref key.
|
|
||||||
return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, status.Ref)
|
|
||||||
}
|
|
||||||
|
|
||||||
if total > 0 && status.Total > 0 && total != status.Total {
|
|
||||||
return nil, errors.Errorf("provided total differs from status: %v != %v", total, status.Total)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(stevvooe): slow slow slow!!, send to goroutine or use resumable hashes
|
|
||||||
fp, err := os.Open(data)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
p := bufPool.Get().(*[]byte)
|
|
||||||
offset, err = io.CopyBuffer(digester.Hash(), fp, *p)
|
|
||||||
bufPool.Put(p)
|
|
||||||
fp.Close()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
updatedAt = status.UpdatedAt
|
|
||||||
startedAt = status.StartedAt
|
|
||||||
total = status.Total
|
|
||||||
} else {
|
|
||||||
startedAt = time.Now()
|
startedAt = time.Now()
|
||||||
updatedAt = startedAt
|
updatedAt = startedAt
|
||||||
|
|
||||||
@ -546,11 +556,11 @@ func (s *store) writer(ctx context.Context, ref string, total int64, expected di
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if writeTimestampFile(filepath.Join(path, "startedat"), startedAt); err != nil {
|
if err := writeTimestampFile(filepath.Join(path, "startedat"), startedAt); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if writeTimestampFile(filepath.Join(path, "updatedat"), startedAt); err != nil {
|
if err := writeTimestampFile(filepath.Join(path, "updatedat"), startedAt); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user