Merge pull request #179 from Random-Liu/checkpoint-container-status

Checkpoint container status onto disk.
This commit is contained in:
Lantao Liu
2017-09-06 13:51:38 -07:00
committed by GitHub
19 changed files with 1019 additions and 63 deletions

View File

@@ -204,7 +204,8 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C
}()
status := containerstore.Status{CreatedAt: time.Now().UnixNano()}
container, err := containerstore.NewContainer(meta, status,
container, err := containerstore.NewContainer(meta,
containerstore.WithStatus(status, containerRootDir),
containerstore.WithContainer(cntr),
containerstore.WithContainerIO(containerIO),
)

View File

@@ -47,15 +47,17 @@ func TestToCRIContainer(t *testing.T) {
Config: config,
ImageRef: "test-image-ref",
},
containerstore.Status{
Pid: 1234,
CreatedAt: createdAt,
StartedAt: time.Now().UnixNano(),
FinishedAt: time.Now().UnixNano(),
ExitCode: 1,
Reason: "test-reason",
Message: "test-message",
},
containerstore.WithFakeStatus(
containerstore.Status{
Pid: 1234,
CreatedAt: createdAt,
StartedAt: time.Now().UnixNano(),
FinishedAt: time.Now().UnixNano(),
ExitCode: 1,
Reason: "test-reason",
Message: "test-message",
},
),
)
assert.NoError(t, err)
expect := &runtime.Container{
@@ -158,7 +160,10 @@ type containerForTest struct {
}
func (c containerForTest) toContainer() (containerstore.Container, error) {
return containerstore.NewContainer(c.metadata, c.status)
return containerstore.NewContainer(
c.metadata,
containerstore.WithFakeStatus(c.status),
)
}
func TestListContainers(t *testing.T) {

View File

@@ -61,7 +61,7 @@ func TestSetContainerRemoving(t *testing.T) {
t.Logf("TestCase %q", desc)
container, err := containerstore.NewContainer(
containerstore.Metadata{ID: testID},
test.status,
containerstore.WithFakeStatus(test.status),
)
assert.NoError(t, err)
err = setContainerRemoving(container)

View File

@@ -122,7 +122,10 @@ func TestToCRIContainerStatus(t *testing.T) {
status.ExitCode = test.exitCode
status.Reason = test.reason
status.Message = test.message
container, err := containerstore.NewContainer(*metadata, *status)
container, err := containerstore.NewContainer(
*metadata,
containerstore.WithFakeStatus(*status),
)
assert.NoError(t, err)
// Set expectation based on test case.
expected.State = test.expectedState
@@ -172,7 +175,10 @@ func TestContainerStatus(t *testing.T) {
// Update status with test case.
status.FinishedAt = test.finishedAt
status.Reason = test.reason
container, err := containerstore.NewContainer(*metadata, *status)
container, err := containerstore.NewContainer(
*metadata,
containerstore.WithFakeStatus(*status),
)
assert.NoError(t, err)
if test.exist {
assert.NoError(t, c.containerStore.Add(container))

View File

@@ -70,7 +70,7 @@ func TestWaitContainerStop(t *testing.T) {
if test.status != nil {
container, err := containerstore.NewContainer(
containerstore.Metadata{ID: id},
*test.status,
containerstore.WithFakeStatus(*test.status),
)
assert.NoError(t, err)
assert.NoError(t, c.containerStore.Add(container))

View File

@@ -40,34 +40,45 @@ type Container struct {
}
// Opts sets specific information to newly created Container.
type Opts func(*Container)
type Opts func(*Container) error
// WithContainer adds the containerd Container to the internal data store.
func WithContainer(cntr containerd.Container) Opts {
return func(c *Container) {
return func(c *Container) error {
c.Container = cntr
return nil
}
}
// WithContainerIO adds IO into the container.
func WithContainerIO(io *cio.ContainerIO) Opts {
return func(c *Container) {
return func(c *Container) error {
c.IO = io
return nil
}
}
// WithStatus adds status to the container.
func WithStatus(status Status, root string) Opts {
return func(c *Container) error {
s, err := StoreStatus(root, c.ID, status)
if err != nil {
return err
}
c.Status = s
return nil
}
}
// NewContainer creates an internally used container type.
func NewContainer(metadata Metadata, status Status, opts ...Opts) (Container, error) {
s, err := StoreStatus(metadata.ID, status)
if err != nil {
return Container{}, err
}
func NewContainer(metadata Metadata, opts ...Opts) (Container, error) {
c := Container{
Metadata: metadata,
Status: s,
}
for _, o := range opts {
o(&c)
if err := o(&c); err != nil {
return Container{}, err
}
}
return c, nil
}

View File

@@ -100,7 +100,10 @@ func TestContainerStore(t *testing.T) {
assert := assertlib.New(t)
containers := map[string]Container{}
for _, id := range ids {
container, err := NewContainer(metadatas[id], statuses[id])
container, err := NewContainer(
metadatas[id],
WithFakeStatus(statuses[id]),
)
assert.NoError(err)
containers[id] = container
}
@@ -162,11 +165,15 @@ func TestWithContainerIO(t *testing.T) {
}
assert := assertlib.New(t)
c, err := NewContainer(meta, status)
c, err := NewContainer(meta, WithFakeStatus(status))
assert.NoError(err)
assert.Nil(c.IO)
c, err = NewContainer(meta, status, WithContainerIO(&cio.ContainerIO{}))
c, err = NewContainer(
meta,
WithFakeStatus(status),
WithContainerIO(&cio.ContainerIO{}),
)
assert.NoError(err)
assert.NotNil(c.IO)
}

View File

@@ -0,0 +1,54 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import "sync"
// WithFakeStatus adds fake status to the container.
func WithFakeStatus(status Status) Opts {
return func(c *Container) error {
c.Status = &fakeStatusStorage{status: status}
return nil
}
}
// fakeStatusStorage is a fake status storage for testing.
type fakeStatusStorage struct {
sync.RWMutex
status Status
}
func (f *fakeStatusStorage) Get() Status {
f.RLock()
defer f.RUnlock()
return f.status
}
func (f *fakeStatusStorage) Update(u UpdateFunc) error {
f.Lock()
defer f.Unlock()
newStatus, err := u(f.status)
if err != nil {
return err
}
f.status = newStatus
return nil
}
func (f *fakeStatusStorage) Delete() error {
return nil
}

View File

@@ -17,16 +17,19 @@ limitations under the License.
package container
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"github.com/docker/docker/pkg/ioutils"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)
// TODO(random-liu): Handle versioning.
// TODO(random-liu): Add checkpoint support.
// version is current version of container status.
const version = "v1" // nolint
// statusVersion is current version of container status.
const statusVersion = "v1" // nolint
// versionedStatus is the internal used versioned container status.
// nolint
@@ -60,19 +63,42 @@ type Status struct {
}
// State returns current state of the container based on the container status.
func (c Status) State() runtime.ContainerState {
if c.FinishedAt != 0 {
func (s Status) State() runtime.ContainerState {
if s.FinishedAt != 0 {
return runtime.ContainerState_CONTAINER_EXITED
}
if c.StartedAt != 0 {
if s.StartedAt != 0 {
return runtime.ContainerState_CONTAINER_RUNNING
}
if c.CreatedAt != 0 {
if s.CreatedAt != 0 {
return runtime.ContainerState_CONTAINER_CREATED
}
return runtime.ContainerState_CONTAINER_UNKNOWN
}
// encode encodes Status into bytes in json format.
func (s *Status) encode() ([]byte, error) {
return json.Marshal(&versionedStatus{
Version: statusVersion,
Status: *s,
})
}
// decode decodes Status from bytes.
func (s *Status) decode(data []byte) error {
versioned := &versionedStatus{}
if err := json.Unmarshal(data, versioned); err != nil {
return err
}
// Handle old version after upgrade.
switch versioned.Version {
case statusVersion:
*s = versioned.Status
return nil
}
return fmt.Errorf("unsupported version")
}
// UpdateFunc is function used to update the container status. If there
// is an error, the update will be rolled back.
type UpdateFunc func(Status) (Status, error)
@@ -99,48 +125,73 @@ type StatusStorage interface {
// StoreStatus creates the storage containing the passed in container status with the
// specified id.
// The status MUST be created in one transaction.
func StoreStatus(id string, status Status) (StatusStorage, error) {
return &statusStorage{status: status}, nil
// TODO(random-liu): Create the data on disk atomically.
func StoreStatus(root, id string, status Status) (StatusStorage, error) {
data, err := status.encode()
if err != nil {
return nil, fmt.Errorf("failed to encode status: %v", err)
}
path := filepath.Join(root, "status")
if err := ioutils.AtomicWriteFile(path, data, 0600); err != nil {
return nil, fmt.Errorf("failed to checkpoint status to %q: %v", path, err)
}
return &statusStorage{
path: path,
status: status,
}, nil
}
// LoadStatus loads container status from checkpoint.
func LoadStatus(id string) (StatusStorage, error) {
// TODO(random-liu): Load container status from disk.
return nil, nil
// LoadStatus loads container status from checkpoint. There shouldn't be threads
// writing to the file during loading.
func LoadStatus(root, id string) (Status, error) {
path := filepath.Join(root, "status")
data, err := ioutil.ReadFile(path)
if err != nil {
return Status{}, fmt.Errorf("failed to read status from %q: %v", path, err)
}
var status Status
if err := status.decode(data); err != nil {
return Status{}, fmt.Errorf("failed to decode status %q: %v", data, err)
}
return status, nil
}
type statusStorage struct {
sync.RWMutex
path string
status Status
}
// Get a copy of container status.
func (m *statusStorage) Get() Status {
m.RLock()
defer m.RUnlock()
return m.status
func (s *statusStorage) Get() Status {
s.RLock()
defer s.RUnlock()
return s.status
}
// Update the container status.
func (m *statusStorage) Update(u UpdateFunc) error {
m.Lock()
defer m.Unlock()
newStatus, err := u(m.status)
func (s *statusStorage) Update(u UpdateFunc) error {
s.Lock()
defer s.Unlock()
newStatus, err := u(s.status)
if err != nil {
return err
}
// TODO(random-liu) *Update* existing status on disk atomically,
// return error if checkpoint failed.
m.status = newStatus
data, err := newStatus.encode()
if err != nil {
return fmt.Errorf("failed to encode status: %v", err)
}
if err := ioutils.AtomicWriteFile(s.path, data, 0600); err != nil {
return fmt.Errorf("failed to checkpoint status to %q: %v", s.path, err)
}
s.status = newStatus
return nil
}
// Delete deletes the container status from disk atomically.
func (m *statusStorage) Delete() error {
// TODO(random-liu): Rename the data on the disk, returns error
// if fails. No lock is needed because file rename is atomic.
// TODO(random-liu): Cleanup temporary files generated, do not
// return error.
return nil
func (s *statusStorage) Delete() error {
temp := filepath.Dir(s.path) + ".del-" + filepath.Base(s.path)
if err := os.Rename(s.path, temp); err != nil && !os.IsNotExist(err) {
return err
}
return os.RemoveAll(temp)
}

View File

@@ -17,11 +17,16 @@ limitations under the License.
package container
import (
"encoding/json"
"errors"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
assertlib "github.com/stretchr/testify/assert"
requirelib "github.com/stretchr/testify/require"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)
@@ -60,6 +65,32 @@ func TestContainerState(t *testing.T) {
}
}
func TestStatusEncodeDecode(t *testing.T) {
s := &Status{
Pid: 1234,
CreatedAt: time.Now().UnixNano(),
StartedAt: time.Now().UnixNano(),
FinishedAt: time.Now().UnixNano(),
ExitCode: 1,
Reason: "test-reason",
Message: "test-message",
Removing: true,
}
assert := assertlib.New(t)
data, err := s.encode()
assert.NoError(err)
newS := &Status{}
assert.NoError(newS.decode(data))
assert.Equal(s, newS)
unsupported, err := json.Marshal(&versionedStatus{
Version: "random-test-version",
Status: *s,
})
assert.NoError(err)
assert.Error(newS.decode(unsupported))
}
func TestStatus(t *testing.T) {
testID := "test-id"
testStatus := Status{
@@ -71,12 +102,23 @@ func TestStatus(t *testing.T) {
}
updateErr := errors.New("update error")
assert := assertlib.New(t)
require := requirelib.New(t)
tempDir, err := ioutil.TempDir(os.TempDir(), "status-test")
require.NoError(err)
defer os.RemoveAll(tempDir)
statusFile := filepath.Join(tempDir, "status")
t.Logf("simple store and get")
s, err := StoreStatus(testID, testStatus)
s, err := StoreStatus(tempDir, testID, testStatus)
assert.NoError(err)
old := s.Get()
assert.Equal(testStatus, old)
_, err = os.Stat(statusFile)
assert.NoError(err)
loaded, err := LoadStatus(tempDir, testID)
require.NoError(err)
assert.Equal(testStatus, loaded)
t.Logf("failed update should not take effect")
err = s.Update(func(o Status) (Status, error) {
@@ -85,6 +127,9 @@ func TestStatus(t *testing.T) {
})
assert.Equal(updateErr, err)
assert.Equal(testStatus, s.Get())
loaded, err = LoadStatus(tempDir, testID)
require.NoError(err)
assert.Equal(testStatus, loaded)
t.Logf("successful update should take effect")
err = s.Update(func(o Status) (Status, error) {
@@ -93,9 +138,20 @@ func TestStatus(t *testing.T) {
})
assert.NoError(err)
assert.Equal(updateStatus, s.Get())
loaded, err = LoadStatus(tempDir, testID)
require.NoError(err)
assert.Equal(updateStatus, loaded)
t.Logf("successful update should not affect existing snapshot")
assert.Equal(testStatus, old)
// TODO(random-liu): Test Load and Delete after disc checkpoint is added.
t.Logf("delete status")
assert.NoError(s.Delete())
_, err = LoadStatus(tempDir, testID)
assert.Error(err)
_, err = os.Stat(statusFile)
assert.True(os.IsNotExist(err))
t.Logf("delete status should be idempotent")
assert.NoError(s.Delete())
}