Merge pull request #1475 from dmcgowan/content-commit-context
Add context to content commit
This commit is contained in:
commit
e1eeb0e0a2
@ -332,7 +332,7 @@ var (
|
||||
return err
|
||||
}
|
||||
|
||||
if err := wr.Commit(0, wr.Digest()); err != nil {
|
||||
if err := wr.Commit(ctx, 0, wr.Digest()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -70,7 +70,7 @@ var pushObjectCommand = cli.Command{
|
||||
}
|
||||
|
||||
// TODO: Progress reader
|
||||
if err := content.Copy(cw, content.NewReader(ra), desc.Size, desc.Digest); err != nil {
|
||||
if err := content.Copy(ctx, cw, content.NewReader(ra), desc.Size, desc.Digest); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -90,7 +90,7 @@ type Writer interface {
|
||||
|
||||
// Commit commits the blob (but no roll-back is guaranteed on an error).
|
||||
// size and expected can be zero-value when unknown.
|
||||
Commit(size int64, expected digest.Digest, opts ...Opt) error
|
||||
Commit(ctx context.Context, size int64, expected digest.Digest, opts ...Opt) error
|
||||
|
||||
// Status returns the current state of write
|
||||
Status() (Status, error)
|
||||
|
@ -58,7 +58,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i
|
||||
}
|
||||
defer cw.Close()
|
||||
|
||||
return Copy(cw, r, size, expected)
|
||||
return Copy(ctx, cw, r, size, expected)
|
||||
}
|
||||
|
||||
// Copy copies data with the expected digest from the reader into the
|
||||
@ -68,7 +68,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i
|
||||
// the size or digest is unknown, these values may be empty.
|
||||
//
|
||||
// Copy is buffered, so no need to wrap reader in buffered io.
|
||||
func Copy(cw Writer, r io.Reader, size int64, expected digest.Digest) error {
|
||||
func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest) error {
|
||||
ws, err := cw.Status()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -95,7 +95,7 @@ func Copy(cw Writer, r io.Reader, size int64, expected digest.Digest) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := cw.Commit(size, expected); err != nil {
|
||||
if err := cw.Commit(ctx, size, expected); err != nil {
|
||||
if !errdefs.IsAlreadyExists(err) {
|
||||
return errors.Wrapf(err, "failed commit on ref %q", ws.Ref)
|
||||
}
|
||||
|
@ -94,7 +94,7 @@ func TestContentWriter(t *testing.T) {
|
||||
|
||||
checkCopy(t, int64(len(p)), cw, bufio.NewReader(ioutil.NopCloser(bytes.NewReader(p))))
|
||||
|
||||
if err := cw.Commit(int64(len(p)), expected); err != nil {
|
||||
if err := cw.Commit(ctx, int64(len(p)), expected); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@ -109,7 +109,7 @@ func TestContentWriter(t *testing.T) {
|
||||
|
||||
// now, attempt to write the same data again
|
||||
checkCopy(t, int64(len(p)), cw, bufio.NewReader(ioutil.NopCloser(bytes.NewReader(p))))
|
||||
if err := cw.Commit(int64(len(p)), expected); err != nil {
|
||||
if err := cw.Commit(ctx, int64(len(p)), expected); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package local
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
@ -54,7 +55,7 @@ func (w *writer) Write(p []byte) (n int, err error) {
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (w *writer) Commit(size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||
func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||
if w.fp == nil {
|
||||
return errors.Wrap(errdefs.ErrFailedPrecondition, "cannot commit on closed writer")
|
||||
}
|
||||
|
@ -121,7 +121,7 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store
|
||||
}
|
||||
|
||||
preCommit := time.Now()
|
||||
if err := s.writer.Commit(0, ""); err != nil {
|
||||
if err := s.writer.Commit(ctx, 0, ""); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
postCommit := time.Now()
|
||||
@ -201,7 +201,7 @@ func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
}
|
||||
|
||||
preCommit := time.Now()
|
||||
if err := w1.Commit(0, ""); err != nil {
|
||||
if err := w1.Commit(ctx, 0, ""); err != nil {
|
||||
t.Fatalf("Commit failed: %+v", err)
|
||||
}
|
||||
postCommit := time.Now()
|
||||
@ -235,7 +235,7 @@ func checkLabels(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
}
|
||||
|
||||
preCommit := time.Now()
|
||||
if err := w1.Commit(0, "", content.WithLabels(labels)); err != nil {
|
||||
if err := w1.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil {
|
||||
t.Fatalf("Commit failed: %+v", err)
|
||||
}
|
||||
postCommit := time.Now()
|
||||
|
@ -179,7 +179,7 @@ func (s *walkingDiff) DiffMounts(ctx context.Context, lower, upper []mount.Mount
|
||||
}
|
||||
|
||||
dgst := cw.Digest()
|
||||
if err := cw.Commit(0, dgst, opts...); err != nil {
|
||||
if err := cw.Commit(ctx, 0, dgst, opts...); err != nil {
|
||||
return emptyDesc, errors.Wrap(err, "failed to commit")
|
||||
}
|
||||
|
||||
|
@ -352,19 +352,19 @@ type namespacedWriter struct {
|
||||
db *bolt.DB
|
||||
}
|
||||
|
||||
func (nw *namespacedWriter) Commit(size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||
return nw.db.Update(func(tx *bolt.Tx) error {
|
||||
func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||
return update(ctx, nw.db, func(tx *bolt.Tx) error {
|
||||
bkt := getIngestBucket(tx, nw.namespace)
|
||||
if bkt != nil {
|
||||
if err := bkt.Delete([]byte(nw.ref)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nw.commit(tx, size, expected, opts...)
|
||||
return nw.commit(ctx, tx, size, expected, opts...)
|
||||
})
|
||||
}
|
||||
|
||||
func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||
func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||
var base content.Info
|
||||
for _, opt := range opts {
|
||||
if err := opt(&base); err != nil {
|
||||
@ -382,7 +382,7 @@ func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Dige
|
||||
|
||||
actual := nw.Writer.Digest()
|
||||
|
||||
if err := nw.Writer.Commit(size, expected); err != nil {
|
||||
if err := nw.Writer.Commit(ctx, size, expected); err != nil {
|
||||
if !errdefs.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
|
@ -221,7 +221,7 @@ func (pw *pushWriter) Digest() digest.Digest {
|
||||
return pw.expected
|
||||
}
|
||||
|
||||
func (pw *pushWriter) Commit(size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||
func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||
// Check whether read has already thrown an error
|
||||
if _, err := pw.pipe.Write([]byte{}); err != nil && err != io.ErrClosedPipe {
|
||||
return errors.Wrap(err, "pipe error before commit")
|
||||
|
@ -252,7 +252,7 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro
|
||||
|
||||
eg.Go(func() error {
|
||||
defer pw.Close()
|
||||
return content.Copy(cw, io.TeeReader(rc, pw), desc.Size, desc.Digest)
|
||||
return content.Copy(ctx, cw, io.TeeReader(rc, pw), desc.Size, desc.Digest)
|
||||
})
|
||||
|
||||
if err := eg.Wait(); err != nil {
|
||||
|
@ -101,7 +101,7 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc
|
||||
}
|
||||
defer rc.Close()
|
||||
|
||||
return content.Copy(cw, rc, desc.Size, desc.Digest)
|
||||
return content.Copy(ctx, cw, rc, desc.Size, desc.Digest)
|
||||
}
|
||||
|
||||
// PushHandler returns a handler that will push all content from the provider
|
||||
@ -139,5 +139,5 @@ func push(ctx context.Context, provider content.Provider, pusher Pusher, desc oc
|
||||
defer ra.Close()
|
||||
|
||||
rd := io.NewSectionReader(ra, 0, desc.Size)
|
||||
return content.Copy(cw, rd, desc.Size, desc.Digest)
|
||||
return content.Copy(ctx, cw, rd, desc.Size, desc.Digest)
|
||||
}
|
||||
|
@ -424,7 +424,7 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) {
|
||||
if req.Labels != nil {
|
||||
opts = append(opts, content.WithLabels(req.Labels))
|
||||
}
|
||||
if err := wr.Commit(total, expected, opts...); err != nil {
|
||||
if err := wr.Commit(ctx, total, expected, opts...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package content
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
contentapi "github.com/containerd/containerd/api/services/content/v1"
|
||||
@ -80,7 +81,7 @@ func (rw *remoteWriter) Write(p []byte) (n int, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (rw *remoteWriter) Commit(size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||
func (rw *remoteWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||
var base content.Info
|
||||
for _, opt := range opts {
|
||||
if err := opt(&base); err != nil {
|
||||
|
@ -464,7 +464,7 @@ func (s *Service) writeContent(ctx context.Context, mediaType, ref string, r io.
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := writer.Commit(0, ""); err != nil {
|
||||
if err := writer.Commit(ctx, 0, ""); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &types.Descriptor{
|
||||
|
Loading…
Reference in New Issue
Block a user