containerd/content/writer.go
Stephen J Day a4fadc596b
errdefs: centralize error handling
Now that we have most of the services required for use with containerd,
it was found that common patterns were used throughout services. By
defining a central `errdefs` package, we ensure that services will map
errors to and from grpc consistently and cleanly. One can decorate an
error with as much context as necessary, using `pkg/errors` and still
have the error mapped correctly via grpc.

We make a few sacrifices. At this point, the common errors we use across
the repository all map directly to grpc error codes. While this seems
positively crazy, it actually works out quite well. The error conditions
that were specific weren't super necessary and the ones that were
necessary now simply have better context information. We lose the
ability to add new codes, but this constraint may not be a bad thing.

Effectively, as long as one uses the errors defined in `errdefs`, the
error class will be mapped correctly across the grpc boundary and
everything will be good. If you don't use those definitions, the error
maps to "unknown" and the error message is preserved.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-06-29 15:00:47 -07:00

142 lines
3.4 KiB
Go

package content
import (
"os"
"path/filepath"
"time"
"github.com/containerd/containerd/errdefs"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
// writer represents a write transaction against the blob store.
type writer struct {
s *store
fp *os.File // opened data file
path string // path to writer dir
ref string // ref key
offset int64
total int64
digester digest.Digester
startedAt time.Time
updatedAt time.Time
}
func (w *writer) Status() (Status, error) {
return Status{
Ref: w.ref,
Offset: w.offset,
Total: w.total,
StartedAt: w.startedAt,
UpdatedAt: w.updatedAt,
}, nil
}
// Digest returns the current digest of the content, up to the current write.
//
// Cannot be called concurrently with `Write`.
func (w *writer) Digest() digest.Digest {
return w.digester.Digest()
}
// Write p to the transaction.
//
// Note that writes are unbuffered to the backing file. When writing, it is
// recommended to wrap in a bufio.Writer or, preferably, use io.CopyBuffer.
func (w *writer) Write(p []byte) (n int, err error) {
n, err = w.fp.Write(p)
w.digester.Hash().Write(p[:n])
w.offset += int64(len(p))
w.updatedAt = time.Now()
return n, err
}
func (w *writer) Commit(size int64, expected digest.Digest) error {
if err := w.fp.Sync(); err != nil {
return errors.Wrap(err, "sync failed")
}
fi, err := w.fp.Stat()
if err != nil {
return errors.Wrap(err, "stat on ingest file failed")
}
// change to readonly, more important for read, but provides _some_
// protection from this point on. We use the existing perms with a mask
// only allowing reads honoring the umask on creation.
//
// This removes write and exec, only allowing read per the creation umask.
if err := w.fp.Chmod((fi.Mode() & os.ModePerm) &^ 0333); err != nil {
return errors.Wrap(err, "failed to change ingest file permissions")
}
if size > 0 && size != fi.Size() {
return errors.Errorf("%q failed size validation: %v != %v", w.ref, fi.Size(), size)
}
if err := w.fp.Close(); err != nil {
return errors.Wrap(err, "failed closing ingest")
}
dgst := w.digester.Digest()
if expected != "" && expected != dgst {
return errors.Errorf("unexpected digest: %v != %v", dgst, expected)
}
var (
ingest = filepath.Join(w.path, "data")
target = w.s.blobPath(dgst)
)
// make sure parent directories of blob exist
if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil {
return err
}
// clean up!!
defer os.RemoveAll(w.path)
if err := os.Rename(ingest, target); err != nil {
if os.IsExist(err) {
// collision with the target file!
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", dgst)
}
return err
}
unlock(w.ref)
w.fp = nil
return nil
}
// Close the writer, flushing any unwritten data and leaving the progress in
// tact.
//
// If one needs to resume the transaction, a new writer can be obtained from
// `ContentStore.Resume` using the same key. The write can then be continued
// from it was left off.
//
// To abandon a transaction completely, first call close then `Store.Remove` to
// clean up the associated resources.
func (cw *writer) Close() (err error) {
unlock(cw.ref)
if cw.fp != nil {
cw.fp.Sync()
return cw.fp.Close()
}
return nil
}
func (w *writer) Truncate(size int64) error {
if size != 0 {
return errors.New("Truncate: unsupported size")
}
w.offset = 0
w.digester.Hash().Reset()
return w.fp.Truncate(0)
}