diff --git a/pkg/server/container_create.go b/pkg/server/container_create.go index 564c6b42f..19955fbbb 100644 --- a/pkg/server/container_create.go +++ b/pkg/server/container_create.go @@ -204,7 +204,8 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C }() status := containerstore.Status{CreatedAt: time.Now().UnixNano()} - container, err := containerstore.NewContainer(meta, status, + container, err := containerstore.NewContainer(meta, + containerstore.WithStatus(status, containerRootDir), containerstore.WithContainer(cntr), containerstore.WithContainerIO(containerIO), ) diff --git a/pkg/server/container_list_test.go b/pkg/server/container_list_test.go index 3570afca1..69c0f8142 100644 --- a/pkg/server/container_list_test.go +++ b/pkg/server/container_list_test.go @@ -47,15 +47,17 @@ func TestToCRIContainer(t *testing.T) { Config: config, ImageRef: "test-image-ref", }, - containerstore.Status{ - Pid: 1234, - CreatedAt: createdAt, - StartedAt: time.Now().UnixNano(), - FinishedAt: time.Now().UnixNano(), - ExitCode: 1, - Reason: "test-reason", - Message: "test-message", - }, + containerstore.WithFakeStatus( + containerstore.Status{ + Pid: 1234, + CreatedAt: createdAt, + StartedAt: time.Now().UnixNano(), + FinishedAt: time.Now().UnixNano(), + ExitCode: 1, + Reason: "test-reason", + Message: "test-message", + }, + ), ) assert.NoError(t, err) expect := &runtime.Container{ @@ -158,7 +160,10 @@ type containerForTest struct { } func (c containerForTest) toContainer() (containerstore.Container, error) { - return containerstore.NewContainer(c.metadata, c.status) + return containerstore.NewContainer( + c.metadata, + containerstore.WithFakeStatus(c.status), + ) } func TestListContainers(t *testing.T) { diff --git a/pkg/server/container_remove_test.go b/pkg/server/container_remove_test.go index 255f6b0b1..57ee49e0b 100644 --- a/pkg/server/container_remove_test.go +++ b/pkg/server/container_remove_test.go @@ -61,7 +61,7 @@ func TestSetContainerRemoving(t *testing.T) { t.Logf("TestCase %q", desc) container, err := containerstore.NewContainer( containerstore.Metadata{ID: testID}, - test.status, + containerstore.WithFakeStatus(test.status), ) assert.NoError(t, err) err = setContainerRemoving(container) diff --git a/pkg/server/container_status_test.go b/pkg/server/container_status_test.go index 7ead96b32..51e2e5fb6 100644 --- a/pkg/server/container_status_test.go +++ b/pkg/server/container_status_test.go @@ -122,7 +122,10 @@ func TestToCRIContainerStatus(t *testing.T) { status.ExitCode = test.exitCode status.Reason = test.reason status.Message = test.message - container, err := containerstore.NewContainer(*metadata, *status) + container, err := containerstore.NewContainer( + *metadata, + containerstore.WithFakeStatus(*status), + ) assert.NoError(t, err) // Set expectation based on test case. expected.State = test.expectedState @@ -172,7 +175,10 @@ func TestContainerStatus(t *testing.T) { // Update status with test case. status.FinishedAt = test.finishedAt status.Reason = test.reason - container, err := containerstore.NewContainer(*metadata, *status) + container, err := containerstore.NewContainer( + *metadata, + containerstore.WithFakeStatus(*status), + ) assert.NoError(t, err) if test.exist { assert.NoError(t, c.containerStore.Add(container)) diff --git a/pkg/server/container_stop_test.go b/pkg/server/container_stop_test.go index 7ebc1ba8a..a1d2cc349 100644 --- a/pkg/server/container_stop_test.go +++ b/pkg/server/container_stop_test.go @@ -70,7 +70,7 @@ func TestWaitContainerStop(t *testing.T) { if test.status != nil { container, err := containerstore.NewContainer( containerstore.Metadata{ID: id}, - *test.status, + containerstore.WithFakeStatus(*test.status), ) assert.NoError(t, err) assert.NoError(t, c.containerStore.Add(container)) diff --git a/pkg/store/container/container.go b/pkg/store/container/container.go index 057b1dd70..11da1debe 100644 --- a/pkg/store/container/container.go +++ b/pkg/store/container/container.go @@ -40,34 +40,45 @@ type Container struct { } // Opts sets specific information to newly created Container. -type Opts func(*Container) +type Opts func(*Container) error // WithContainer adds the containerd Container to the internal data store. func WithContainer(cntr containerd.Container) Opts { - return func(c *Container) { + return func(c *Container) error { c.Container = cntr + return nil } } // WithContainerIO adds IO into the container. func WithContainerIO(io *cio.ContainerIO) Opts { - return func(c *Container) { + return func(c *Container) error { c.IO = io + return nil + } +} + +// WithStatus adds status to the container. +func WithStatus(status Status, root string) Opts { + return func(c *Container) error { + s, err := StoreStatus(root, c.ID, status) + if err != nil { + return err + } + c.Status = s + return nil } } // NewContainer creates an internally used container type. -func NewContainer(metadata Metadata, status Status, opts ...Opts) (Container, error) { - s, err := StoreStatus(metadata.ID, status) - if err != nil { - return Container{}, err - } +func NewContainer(metadata Metadata, opts ...Opts) (Container, error) { c := Container{ Metadata: metadata, - Status: s, } for _, o := range opts { - o(&c) + if err := o(&c); err != nil { + return Container{}, err + } } return c, nil } diff --git a/pkg/store/container/container_test.go b/pkg/store/container/container_test.go index 176d29f61..c3233b305 100644 --- a/pkg/store/container/container_test.go +++ b/pkg/store/container/container_test.go @@ -100,7 +100,10 @@ func TestContainerStore(t *testing.T) { assert := assertlib.New(t) containers := map[string]Container{} for _, id := range ids { - container, err := NewContainer(metadatas[id], statuses[id]) + container, err := NewContainer( + metadatas[id], + WithFakeStatus(statuses[id]), + ) assert.NoError(err) containers[id] = container } @@ -162,11 +165,15 @@ func TestWithContainerIO(t *testing.T) { } assert := assertlib.New(t) - c, err := NewContainer(meta, status) + c, err := NewContainer(meta, WithFakeStatus(status)) assert.NoError(err) assert.Nil(c.IO) - c, err = NewContainer(meta, status, WithContainerIO(&cio.ContainerIO{})) + c, err = NewContainer( + meta, + WithFakeStatus(status), + WithContainerIO(&cio.ContainerIO{}), + ) assert.NoError(err) assert.NotNil(c.IO) } diff --git a/pkg/store/container/fake_status.go b/pkg/store/container/fake_status.go new file mode 100644 index 000000000..2f0f5fbff --- /dev/null +++ b/pkg/store/container/fake_status.go @@ -0,0 +1,54 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import "sync" + +// WithFakeStatus adds fake status to the container. +func WithFakeStatus(status Status) Opts { + return func(c *Container) error { + c.Status = &fakeStatusStorage{status: status} + return nil + } +} + +// fakeStatusStorage is a fake status storage for testing. +type fakeStatusStorage struct { + sync.RWMutex + status Status +} + +func (f *fakeStatusStorage) Get() Status { + f.RLock() + defer f.RUnlock() + return f.status +} + +func (f *fakeStatusStorage) Update(u UpdateFunc) error { + f.Lock() + defer f.Unlock() + newStatus, err := u(f.status) + if err != nil { + return err + } + f.status = newStatus + return nil +} + +func (f *fakeStatusStorage) Delete() error { + return nil +} diff --git a/pkg/store/container/status.go b/pkg/store/container/status.go index 39b8ff6a9..56289e77c 100644 --- a/pkg/store/container/status.go +++ b/pkg/store/container/status.go @@ -17,16 +17,19 @@ limitations under the License. package container import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path/filepath" "sync" + "github.com/docker/docker/pkg/ioutils" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) -// TODO(random-liu): Handle versioning. -// TODO(random-liu): Add checkpoint support. - -// version is current version of container status. -const version = "v1" // nolint +// statusVersion is current version of container status. +const statusVersion = "v1" // nolint // versionedStatus is the internal used versioned container status. // nolint @@ -60,19 +63,42 @@ type Status struct { } // State returns current state of the container based on the container status. -func (c Status) State() runtime.ContainerState { - if c.FinishedAt != 0 { +func (s Status) State() runtime.ContainerState { + if s.FinishedAt != 0 { return runtime.ContainerState_CONTAINER_EXITED } - if c.StartedAt != 0 { + if s.StartedAt != 0 { return runtime.ContainerState_CONTAINER_RUNNING } - if c.CreatedAt != 0 { + if s.CreatedAt != 0 { return runtime.ContainerState_CONTAINER_CREATED } return runtime.ContainerState_CONTAINER_UNKNOWN } +// encode encodes Status into bytes in json format. +func (s *Status) encode() ([]byte, error) { + return json.Marshal(&versionedStatus{ + Version: statusVersion, + Status: *s, + }) +} + +// decode decodes Status from bytes. +func (s *Status) decode(data []byte) error { + versioned := &versionedStatus{} + if err := json.Unmarshal(data, versioned); err != nil { + return err + } + // Handle old version after upgrade. + switch versioned.Version { + case statusVersion: + *s = versioned.Status + return nil + } + return fmt.Errorf("unsupported version") +} + // UpdateFunc is function used to update the container status. If there // is an error, the update will be rolled back. type UpdateFunc func(Status) (Status, error) @@ -99,48 +125,73 @@ type StatusStorage interface { // StoreStatus creates the storage containing the passed in container status with the // specified id. // The status MUST be created in one transaction. -func StoreStatus(id string, status Status) (StatusStorage, error) { - return &statusStorage{status: status}, nil - // TODO(random-liu): Create the data on disk atomically. +func StoreStatus(root, id string, status Status) (StatusStorage, error) { + data, err := status.encode() + if err != nil { + return nil, fmt.Errorf("failed to encode status: %v", err) + } + path := filepath.Join(root, "status") + if err := ioutils.AtomicWriteFile(path, data, 0600); err != nil { + return nil, fmt.Errorf("failed to checkpoint status to %q: %v", path, err) + } + return &statusStorage{ + path: path, + status: status, + }, nil } -// LoadStatus loads container status from checkpoint. -func LoadStatus(id string) (StatusStorage, error) { - // TODO(random-liu): Load container status from disk. - return nil, nil +// LoadStatus loads container status from checkpoint. There shouldn't be threads +// writing to the file during loading. +func LoadStatus(root, id string) (Status, error) { + path := filepath.Join(root, "status") + data, err := ioutil.ReadFile(path) + if err != nil { + return Status{}, fmt.Errorf("failed to read status from %q: %v", path, err) + } + var status Status + if err := status.decode(data); err != nil { + return Status{}, fmt.Errorf("failed to decode status %q: %v", data, err) + } + return status, nil } type statusStorage struct { sync.RWMutex + path string status Status } // Get a copy of container status. -func (m *statusStorage) Get() Status { - m.RLock() - defer m.RUnlock() - return m.status +func (s *statusStorage) Get() Status { + s.RLock() + defer s.RUnlock() + return s.status } // Update the container status. -func (m *statusStorage) Update(u UpdateFunc) error { - m.Lock() - defer m.Unlock() - newStatus, err := u(m.status) +func (s *statusStorage) Update(u UpdateFunc) error { + s.Lock() + defer s.Unlock() + newStatus, err := u(s.status) if err != nil { return err } - // TODO(random-liu) *Update* existing status on disk atomically, - // return error if checkpoint failed. - m.status = newStatus + data, err := newStatus.encode() + if err != nil { + return fmt.Errorf("failed to encode status: %v", err) + } + if err := ioutils.AtomicWriteFile(s.path, data, 0600); err != nil { + return fmt.Errorf("failed to checkpoint status to %q: %v", s.path, err) + } + s.status = newStatus return nil } // Delete deletes the container status from disk atomically. -func (m *statusStorage) Delete() error { - // TODO(random-liu): Rename the data on the disk, returns error - // if fails. No lock is needed because file rename is atomic. - // TODO(random-liu): Cleanup temporary files generated, do not - // return error. - return nil +func (s *statusStorage) Delete() error { + temp := filepath.Dir(s.path) + ".del-" + filepath.Base(s.path) + if err := os.Rename(s.path, temp); err != nil && !os.IsNotExist(err) { + return err + } + return os.RemoveAll(temp) } diff --git a/pkg/store/container/status_test.go b/pkg/store/container/status_test.go index 8549bf7de..bd3537875 100644 --- a/pkg/store/container/status_test.go +++ b/pkg/store/container/status_test.go @@ -17,11 +17,16 @@ limitations under the License. package container import ( + "encoding/json" "errors" + "io/ioutil" + "os" + "path/filepath" "testing" "time" assertlib "github.com/stretchr/testify/assert" + requirelib "github.com/stretchr/testify/require" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) @@ -60,6 +65,32 @@ func TestContainerState(t *testing.T) { } } +func TestStatusEncodeDecode(t *testing.T) { + s := &Status{ + Pid: 1234, + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + FinishedAt: time.Now().UnixNano(), + ExitCode: 1, + Reason: "test-reason", + Message: "test-message", + Removing: true, + } + assert := assertlib.New(t) + data, err := s.encode() + assert.NoError(err) + newS := &Status{} + assert.NoError(newS.decode(data)) + assert.Equal(s, newS) + + unsupported, err := json.Marshal(&versionedStatus{ + Version: "random-test-version", + Status: *s, + }) + assert.NoError(err) + assert.Error(newS.decode(unsupported)) +} + func TestStatus(t *testing.T) { testID := "test-id" testStatus := Status{ @@ -71,12 +102,23 @@ func TestStatus(t *testing.T) { } updateErr := errors.New("update error") assert := assertlib.New(t) + require := requirelib.New(t) + + tempDir, err := ioutil.TempDir(os.TempDir(), "status-test") + require.NoError(err) + defer os.RemoveAll(tempDir) + statusFile := filepath.Join(tempDir, "status") t.Logf("simple store and get") - s, err := StoreStatus(testID, testStatus) + s, err := StoreStatus(tempDir, testID, testStatus) assert.NoError(err) old := s.Get() assert.Equal(testStatus, old) + _, err = os.Stat(statusFile) + assert.NoError(err) + loaded, err := LoadStatus(tempDir, testID) + require.NoError(err) + assert.Equal(testStatus, loaded) t.Logf("failed update should not take effect") err = s.Update(func(o Status) (Status, error) { @@ -85,6 +127,9 @@ func TestStatus(t *testing.T) { }) assert.Equal(updateErr, err) assert.Equal(testStatus, s.Get()) + loaded, err = LoadStatus(tempDir, testID) + require.NoError(err) + assert.Equal(testStatus, loaded) t.Logf("successful update should take effect") err = s.Update(func(o Status) (Status, error) { @@ -93,9 +138,20 @@ func TestStatus(t *testing.T) { }) assert.NoError(err) assert.Equal(updateStatus, s.Get()) + loaded, err = LoadStatus(tempDir, testID) + require.NoError(err) + assert.Equal(updateStatus, loaded) t.Logf("successful update should not affect existing snapshot") assert.Equal(testStatus, old) - // TODO(random-liu): Test Load and Delete after disc checkpoint is added. + t.Logf("delete status") + assert.NoError(s.Delete()) + _, err = LoadStatus(tempDir, testID) + assert.Error(err) + _, err = os.Stat(statusFile) + assert.True(os.IsNotExist(err)) + + t.Logf("delete status should be idempotent") + assert.NoError(s.Delete()) } diff --git a/vendor/github.com/docker/docker/pkg/ioutils/buffer.go b/vendor/github.com/docker/docker/pkg/ioutils/buffer.go new file mode 100644 index 000000000..3d737b3e1 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/ioutils/buffer.go @@ -0,0 +1,51 @@ +package ioutils + +import ( + "errors" + "io" +) + +var errBufferFull = errors.New("buffer is full") + +type fixedBuffer struct { + buf []byte + pos int + lastRead int +} + +func (b *fixedBuffer) Write(p []byte) (int, error) { + n := copy(b.buf[b.pos:cap(b.buf)], p) + b.pos += n + + if n < len(p) { + if b.pos == cap(b.buf) { + return n, errBufferFull + } + return n, io.ErrShortWrite + } + return n, nil +} + +func (b *fixedBuffer) Read(p []byte) (int, error) { + n := copy(p, b.buf[b.lastRead:b.pos]) + b.lastRead += n + return n, nil +} + +func (b *fixedBuffer) Len() int { + return b.pos - b.lastRead +} + +func (b *fixedBuffer) Cap() int { + return cap(b.buf) +} + +func (b *fixedBuffer) Reset() { + b.pos = 0 + b.lastRead = 0 + b.buf = b.buf[:0] +} + +func (b *fixedBuffer) String() string { + return string(b.buf[b.lastRead:b.pos]) +} diff --git a/vendor/github.com/docker/docker/pkg/ioutils/bytespipe.go b/vendor/github.com/docker/docker/pkg/ioutils/bytespipe.go new file mode 100644 index 000000000..72a04f349 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/ioutils/bytespipe.go @@ -0,0 +1,186 @@ +package ioutils + +import ( + "errors" + "io" + "sync" +) + +// maxCap is the highest capacity to use in byte slices that buffer data. +const maxCap = 1e6 + +// minCap is the lowest capacity to use in byte slices that buffer data +const minCap = 64 + +// blockThreshold is the minimum number of bytes in the buffer which will cause +// a write to BytesPipe to block when allocating a new slice. +const blockThreshold = 1e6 + +var ( + // ErrClosed is returned when Write is called on a closed BytesPipe. + ErrClosed = errors.New("write to closed BytesPipe") + + bufPools = make(map[int]*sync.Pool) + bufPoolsLock sync.Mutex +) + +// BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue). +// All written data may be read at most once. Also, BytesPipe allocates +// and releases new byte slices to adjust to current needs, so the buffer +// won't be overgrown after peak loads. +type BytesPipe struct { + mu sync.Mutex + wait *sync.Cond + buf []*fixedBuffer + bufLen int + closeErr error // error to return from next Read. set to nil if not closed. +} + +// NewBytesPipe creates new BytesPipe, initialized by specified slice. +// If buf is nil, then it will be initialized with slice which cap is 64. +// buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf). +func NewBytesPipe() *BytesPipe { + bp := &BytesPipe{} + bp.buf = append(bp.buf, getBuffer(minCap)) + bp.wait = sync.NewCond(&bp.mu) + return bp +} + +// Write writes p to BytesPipe. +// It can allocate new []byte slices in a process of writing. +func (bp *BytesPipe) Write(p []byte) (int, error) { + bp.mu.Lock() + + written := 0 +loop0: + for { + if bp.closeErr != nil { + bp.mu.Unlock() + return written, ErrClosed + } + + if len(bp.buf) == 0 { + bp.buf = append(bp.buf, getBuffer(64)) + } + // get the last buffer + b := bp.buf[len(bp.buf)-1] + + n, err := b.Write(p) + written += n + bp.bufLen += n + + // errBufferFull is an error we expect to get if the buffer is full + if err != nil && err != errBufferFull { + bp.wait.Broadcast() + bp.mu.Unlock() + return written, err + } + + // if there was enough room to write all then break + if len(p) == n { + break + } + + // more data: write to the next slice + p = p[n:] + + // make sure the buffer doesn't grow too big from this write + for bp.bufLen >= blockThreshold { + bp.wait.Wait() + if bp.closeErr != nil { + continue loop0 + } + } + + // add new byte slice to the buffers slice and continue writing + nextCap := b.Cap() * 2 + if nextCap > maxCap { + nextCap = maxCap + } + bp.buf = append(bp.buf, getBuffer(nextCap)) + } + bp.wait.Broadcast() + bp.mu.Unlock() + return written, nil +} + +// CloseWithError causes further reads from a BytesPipe to return immediately. +func (bp *BytesPipe) CloseWithError(err error) error { + bp.mu.Lock() + if err != nil { + bp.closeErr = err + } else { + bp.closeErr = io.EOF + } + bp.wait.Broadcast() + bp.mu.Unlock() + return nil +} + +// Close causes further reads from a BytesPipe to return immediately. +func (bp *BytesPipe) Close() error { + return bp.CloseWithError(nil) +} + +// Read reads bytes from BytesPipe. +// Data could be read only once. +func (bp *BytesPipe) Read(p []byte) (n int, err error) { + bp.mu.Lock() + if bp.bufLen == 0 { + if bp.closeErr != nil { + bp.mu.Unlock() + return 0, bp.closeErr + } + bp.wait.Wait() + if bp.bufLen == 0 && bp.closeErr != nil { + err := bp.closeErr + bp.mu.Unlock() + return 0, err + } + } + + for bp.bufLen > 0 { + b := bp.buf[0] + read, _ := b.Read(p) // ignore error since fixedBuffer doesn't really return an error + n += read + bp.bufLen -= read + + if b.Len() == 0 { + // it's empty so return it to the pool and move to the next one + returnBuffer(b) + bp.buf[0] = nil + bp.buf = bp.buf[1:] + } + + if len(p) == read { + break + } + + p = p[read:] + } + + bp.wait.Broadcast() + bp.mu.Unlock() + return +} + +func returnBuffer(b *fixedBuffer) { + b.Reset() + bufPoolsLock.Lock() + pool := bufPools[b.Cap()] + bufPoolsLock.Unlock() + if pool != nil { + pool.Put(b) + } +} + +func getBuffer(size int) *fixedBuffer { + bufPoolsLock.Lock() + pool, ok := bufPools[size] + if !ok { + pool = &sync.Pool{New: func() interface{} { return &fixedBuffer{buf: make([]byte, 0, size)} }} + bufPools[size] = pool + } + bufPoolsLock.Unlock() + return pool.Get().(*fixedBuffer) +} diff --git a/vendor/github.com/docker/docker/pkg/ioutils/fswriters.go b/vendor/github.com/docker/docker/pkg/ioutils/fswriters.go new file mode 100644 index 000000000..a56c46265 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/ioutils/fswriters.go @@ -0,0 +1,162 @@ +package ioutils + +import ( + "io" + "io/ioutil" + "os" + "path/filepath" +) + +// NewAtomicFileWriter returns WriteCloser so that writing to it writes to a +// temporary file and closing it atomically changes the temporary file to +// destination path. Writing and closing concurrently is not allowed. +func NewAtomicFileWriter(filename string, perm os.FileMode) (io.WriteCloser, error) { + f, err := ioutil.TempFile(filepath.Dir(filename), ".tmp-"+filepath.Base(filename)) + if err != nil { + return nil, err + } + + abspath, err := filepath.Abs(filename) + if err != nil { + return nil, err + } + return &atomicFileWriter{ + f: f, + fn: abspath, + perm: perm, + }, nil +} + +// AtomicWriteFile atomically writes data to a file named by filename. +func AtomicWriteFile(filename string, data []byte, perm os.FileMode) error { + f, err := NewAtomicFileWriter(filename, perm) + if err != nil { + return err + } + n, err := f.Write(data) + if err == nil && n < len(data) { + err = io.ErrShortWrite + f.(*atomicFileWriter).writeErr = err + } + if err1 := f.Close(); err == nil { + err = err1 + } + return err +} + +type atomicFileWriter struct { + f *os.File + fn string + writeErr error + perm os.FileMode +} + +func (w *atomicFileWriter) Write(dt []byte) (int, error) { + n, err := w.f.Write(dt) + if err != nil { + w.writeErr = err + } + return n, err +} + +func (w *atomicFileWriter) Close() (retErr error) { + defer func() { + if retErr != nil || w.writeErr != nil { + os.Remove(w.f.Name()) + } + }() + if err := w.f.Sync(); err != nil { + w.f.Close() + return err + } + if err := w.f.Close(); err != nil { + return err + } + if err := os.Chmod(w.f.Name(), w.perm); err != nil { + return err + } + if w.writeErr == nil { + return os.Rename(w.f.Name(), w.fn) + } + return nil +} + +// AtomicWriteSet is used to atomically write a set +// of files and ensure they are visible at the same time. +// Must be committed to a new directory. +type AtomicWriteSet struct { + root string +} + +// NewAtomicWriteSet creates a new atomic write set to +// atomically create a set of files. The given directory +// is used as the base directory for storing files before +// commit. If no temporary directory is given the system +// default is used. +func NewAtomicWriteSet(tmpDir string) (*AtomicWriteSet, error) { + td, err := ioutil.TempDir(tmpDir, "write-set-") + if err != nil { + return nil, err + } + + return &AtomicWriteSet{ + root: td, + }, nil +} + +// WriteFile writes a file to the set, guaranteeing the file +// has been synced. +func (ws *AtomicWriteSet) WriteFile(filename string, data []byte, perm os.FileMode) error { + f, err := ws.FileWriter(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) + if err != nil { + return err + } + n, err := f.Write(data) + if err == nil && n < len(data) { + err = io.ErrShortWrite + } + if err1 := f.Close(); err == nil { + err = err1 + } + return err +} + +type syncFileCloser struct { + *os.File +} + +func (w syncFileCloser) Close() error { + err := w.File.Sync() + if err1 := w.File.Close(); err == nil { + err = err1 + } + return err +} + +// FileWriter opens a file writer inside the set. The file +// should be synced and closed before calling commit. +func (ws *AtomicWriteSet) FileWriter(name string, flag int, perm os.FileMode) (io.WriteCloser, error) { + f, err := os.OpenFile(filepath.Join(ws.root, name), flag, perm) + if err != nil { + return nil, err + } + return syncFileCloser{f}, nil +} + +// Cancel cancels the set and removes all temporary data +// created in the set. +func (ws *AtomicWriteSet) Cancel() error { + return os.RemoveAll(ws.root) +} + +// Commit moves all created files to the target directory. The +// target directory must not exist and the parent of the target +// directory must exist. +func (ws *AtomicWriteSet) Commit(target string) error { + return os.Rename(ws.root, target) +} + +// String returns the location the set is writing to. +func (ws *AtomicWriteSet) String() string { + return ws.root +} diff --git a/vendor/github.com/docker/docker/pkg/ioutils/readers.go b/vendor/github.com/docker/docker/pkg/ioutils/readers.go new file mode 100644 index 000000000..63f3c07f4 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/ioutils/readers.go @@ -0,0 +1,154 @@ +package ioutils + +import ( + "crypto/sha256" + "encoding/hex" + "io" + + "golang.org/x/net/context" +) + +type readCloserWrapper struct { + io.Reader + closer func() error +} + +func (r *readCloserWrapper) Close() error { + return r.closer() +} + +// NewReadCloserWrapper returns a new io.ReadCloser. +func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser { + return &readCloserWrapper{ + Reader: r, + closer: closer, + } +} + +type readerErrWrapper struct { + reader io.Reader + closer func() +} + +func (r *readerErrWrapper) Read(p []byte) (int, error) { + n, err := r.reader.Read(p) + if err != nil { + r.closer() + } + return n, err +} + +// NewReaderErrWrapper returns a new io.Reader. +func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader { + return &readerErrWrapper{ + reader: r, + closer: closer, + } +} + +// HashData returns the sha256 sum of src. +func HashData(src io.Reader) (string, error) { + h := sha256.New() + if _, err := io.Copy(h, src); err != nil { + return "", err + } + return "sha256:" + hex.EncodeToString(h.Sum(nil)), nil +} + +// OnEOFReader wraps an io.ReadCloser and a function +// the function will run at the end of file or close the file. +type OnEOFReader struct { + Rc io.ReadCloser + Fn func() +} + +func (r *OnEOFReader) Read(p []byte) (n int, err error) { + n, err = r.Rc.Read(p) + if err == io.EOF { + r.runFunc() + } + return +} + +// Close closes the file and run the function. +func (r *OnEOFReader) Close() error { + err := r.Rc.Close() + r.runFunc() + return err +} + +func (r *OnEOFReader) runFunc() { + if fn := r.Fn; fn != nil { + fn() + r.Fn = nil + } +} + +// cancelReadCloser wraps an io.ReadCloser with a context for cancelling read +// operations. +type cancelReadCloser struct { + cancel func() + pR *io.PipeReader // Stream to read from + pW *io.PipeWriter +} + +// NewCancelReadCloser creates a wrapper that closes the ReadCloser when the +// context is cancelled. The returned io.ReadCloser must be closed when it is +// no longer needed. +func NewCancelReadCloser(ctx context.Context, in io.ReadCloser) io.ReadCloser { + pR, pW := io.Pipe() + + // Create a context used to signal when the pipe is closed + doneCtx, cancel := context.WithCancel(context.Background()) + + p := &cancelReadCloser{ + cancel: cancel, + pR: pR, + pW: pW, + } + + go func() { + _, err := io.Copy(pW, in) + select { + case <-ctx.Done(): + // If the context was closed, p.closeWithError + // was already called. Calling it again would + // change the error that Read returns. + default: + p.closeWithError(err) + } + in.Close() + }() + go func() { + for { + select { + case <-ctx.Done(): + p.closeWithError(ctx.Err()) + case <-doneCtx.Done(): + return + } + } + }() + + return p +} + +// Read wraps the Read method of the pipe that provides data from the wrapped +// ReadCloser. +func (p *cancelReadCloser) Read(buf []byte) (n int, err error) { + return p.pR.Read(buf) +} + +// closeWithError closes the wrapper and its underlying reader. It will +// cause future calls to Read to return err. +func (p *cancelReadCloser) closeWithError(err error) { + p.pW.CloseWithError(err) + p.cancel() +} + +// Close closes the wrapper its underlying reader. It will cause +// future calls to Read to return io.EOF. +func (p *cancelReadCloser) Close() error { + p.closeWithError(io.EOF) + return nil +} diff --git a/vendor/github.com/docker/docker/pkg/ioutils/temp_unix.go b/vendor/github.com/docker/docker/pkg/ioutils/temp_unix.go new file mode 100644 index 000000000..1539ad21b --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/ioutils/temp_unix.go @@ -0,0 +1,10 @@ +// +build !windows + +package ioutils + +import "io/ioutil" + +// TempDir on Unix systems is equivalent to ioutil.TempDir. +func TempDir(dir, prefix string) (string, error) { + return ioutil.TempDir(dir, prefix) +} diff --git a/vendor/github.com/docker/docker/pkg/ioutils/temp_windows.go b/vendor/github.com/docker/docker/pkg/ioutils/temp_windows.go new file mode 100644 index 000000000..c258e5fdd --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/ioutils/temp_windows.go @@ -0,0 +1,18 @@ +// +build windows + +package ioutils + +import ( + "io/ioutil" + + "github.com/docker/docker/pkg/longpath" +) + +// TempDir is the equivalent of ioutil.TempDir, except that the result is in Windows longpath format. +func TempDir(dir, prefix string) (string, error) { + tempDir, err := ioutil.TempDir(dir, prefix) + if err != nil { + return "", err + } + return longpath.AddPrefix(tempDir), nil +} diff --git a/vendor/github.com/docker/docker/pkg/ioutils/writeflusher.go b/vendor/github.com/docker/docker/pkg/ioutils/writeflusher.go new file mode 100644 index 000000000..52a4901ad --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/ioutils/writeflusher.go @@ -0,0 +1,92 @@ +package ioutils + +import ( + "io" + "sync" +) + +// WriteFlusher wraps the Write and Flush operation ensuring that every write +// is a flush. In addition, the Close method can be called to intercept +// Read/Write calls if the targets lifecycle has already ended. +type WriteFlusher struct { + w io.Writer + flusher flusher + flushed chan struct{} + flushedOnce sync.Once + closed chan struct{} + closeLock sync.Mutex +} + +type flusher interface { + Flush() +} + +var errWriteFlusherClosed = io.EOF + +func (wf *WriteFlusher) Write(b []byte) (n int, err error) { + select { + case <-wf.closed: + return 0, errWriteFlusherClosed + default: + } + + n, err = wf.w.Write(b) + wf.Flush() // every write is a flush. + return n, err +} + +// Flush the stream immediately. +func (wf *WriteFlusher) Flush() { + select { + case <-wf.closed: + return + default: + } + + wf.flushedOnce.Do(func() { + close(wf.flushed) + }) + wf.flusher.Flush() +} + +// Flushed returns the state of flushed. +// If it's flushed, return true, or else it return false. +func (wf *WriteFlusher) Flushed() bool { + // BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to + // be used to detect whether or a response code has been issued or not. + // Another hook should be used instead. + var flushed bool + select { + case <-wf.flushed: + flushed = true + default: + } + return flushed +} + +// Close closes the write flusher, disallowing any further writes to the +// target. After the flusher is closed, all calls to write or flush will +// result in an error. +func (wf *WriteFlusher) Close() error { + wf.closeLock.Lock() + defer wf.closeLock.Unlock() + + select { + case <-wf.closed: + return errWriteFlusherClosed + default: + close(wf.closed) + } + return nil +} + +// NewWriteFlusher returns a new WriteFlusher. +func NewWriteFlusher(w io.Writer) *WriteFlusher { + var fl flusher + if f, ok := w.(flusher); ok { + fl = f + } else { + fl = &NopFlusher{} + } + return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{}), flushed: make(chan struct{})} +} diff --git a/vendor/github.com/docker/docker/pkg/ioutils/writers.go b/vendor/github.com/docker/docker/pkg/ioutils/writers.go new file mode 100644 index 000000000..ccc7f9c23 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/ioutils/writers.go @@ -0,0 +1,66 @@ +package ioutils + +import "io" + +// NopWriter represents a type which write operation is nop. +type NopWriter struct{} + +func (*NopWriter) Write(buf []byte) (int, error) { + return len(buf), nil +} + +type nopWriteCloser struct { + io.Writer +} + +func (w *nopWriteCloser) Close() error { return nil } + +// NopWriteCloser returns a nopWriteCloser. +func NopWriteCloser(w io.Writer) io.WriteCloser { + return &nopWriteCloser{w} +} + +// NopFlusher represents a type which flush operation is nop. +type NopFlusher struct{} + +// Flush is a nop operation. +func (f *NopFlusher) Flush() {} + +type writeCloserWrapper struct { + io.Writer + closer func() error +} + +func (r *writeCloserWrapper) Close() error { + return r.closer() +} + +// NewWriteCloserWrapper returns a new io.WriteCloser. +func NewWriteCloserWrapper(r io.Writer, closer func() error) io.WriteCloser { + return &writeCloserWrapper{ + Writer: r, + closer: closer, + } +} + +// WriteCounter wraps a concrete io.Writer and hold a count of the number +// of bytes written to the writer during a "session". +// This can be convenient when write return is masked +// (e.g., json.Encoder.Encode()) +type WriteCounter struct { + Count int64 + Writer io.Writer +} + +// NewWriteCounter returns a new WriteCounter. +func NewWriteCounter(w io.Writer) *WriteCounter { + return &WriteCounter{ + Writer: w, + } +} + +func (wc *WriteCounter) Write(p []byte) (count int, err error) { + count, err = wc.Writer.Write(p) + wc.Count += int64(count) + return +} diff --git a/vendor/github.com/docker/docker/pkg/longpath/longpath.go b/vendor/github.com/docker/docker/pkg/longpath/longpath.go new file mode 100644 index 000000000..9b15bfff4 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/longpath/longpath.go @@ -0,0 +1,26 @@ +// longpath introduces some constants and helper functions for handling long paths +// in Windows, which are expected to be prepended with `\\?\` and followed by either +// a drive letter, a UNC server\share, or a volume identifier. + +package longpath + +import ( + "strings" +) + +// Prefix is the longpath prefix for Windows file paths. +const Prefix = `\\?\` + +// AddPrefix will add the Windows long path prefix to the path provided if +// it does not already have it. +func AddPrefix(path string) string { + if !strings.HasPrefix(path, Prefix) { + if strings.HasPrefix(path, `\\`) { + // This is a UNC path, so we need to add 'UNC' to the path as well. + path = Prefix + `UNC` + path[1:] + } else { + path = Prefix + path + } + } + return path +}