Add new metadata store.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu
2017-06-05 17:26:16 +00:00
parent e5d69aa537
commit a393f3a084
13 changed files with 1217 additions and 0 deletions

View File

@@ -0,0 +1,113 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"sync"
"github.com/kubernetes-incubator/cri-containerd/pkg/store"
)
// Container contains all resources associated with the container. All methods to
// mutate the internal state are thread-safe.
type Container struct {
// Metadata is the metadata of the container, it is **immutable** after created.
Metadata
// Status stores the status of the container.
Status StatusStorage
// TODO(random-liu): Add containerd container client.
// TODO(random-liu): Add stop channel to get rid of stop poll waiting.
}
// NewContainer creates an internally used container type.
func NewContainer(metadata Metadata, status Status) (Container, error) {
s, err := StoreStatus(metadata.ID, status)
if err != nil {
return Container{}, err
}
return Container{
Metadata: metadata,
Status: s,
}, nil
}
// Delete deletes checkpoint for the container.
func (c *Container) Delete() error {
return c.Status.Delete()
}
// LoadContainer loads the internal used container type.
func LoadContainer() (Container, error) {
return Container{}, nil
}
// Store stores all Containers.
type Store struct {
lock sync.RWMutex
containers map[string]Container
// TODO(random-liu): Add trunc index.
}
// LoadStore loads containers from runtime.
// TODO(random-liu): Implement LoadStore.
func LoadStore() *Store { return nil }
// NewStore creates a container store.
func NewStore() *Store {
return &Store{containers: make(map[string]Container)}
}
// Add a container into the store. Returns store.ErrAlreadyExist if the
// container already exists.
func (s *Store) Add(c Container) error {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.containers[c.ID]; ok {
return store.ErrAlreadyExist
}
s.containers[c.ID] = c
return nil
}
// Get returns the container with specified id. Returns store.ErrNotExist
// if the container doesn't exist.
func (s *Store) Get(id string) (Container, error) {
s.lock.RLock()
defer s.lock.RUnlock()
if c, ok := s.containers[id]; ok {
return c, nil
}
return Container{}, store.ErrNotExist
}
// List lists all containers.
func (s *Store) List() []Container {
s.lock.RLock()
defer s.lock.RUnlock()
var containers []Container
for _, c := range s.containers {
containers = append(containers, c)
}
return containers
}
// Delete deletes the container from store with specified id.
func (s *Store) Delete(id string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.containers, id)
}

View File

@@ -0,0 +1,138 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"testing"
"time"
assertlib "github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"github.com/kubernetes-incubator/cri-containerd/pkg/store"
)
func TestContainerStore(t *testing.T) {
ids := []string{"1", "2", "3"}
metadatas := map[string]Metadata{
"1": {
ID: "1",
Name: "Container-1",
SandboxID: "Sandbox-1",
Config: &runtime.ContainerConfig{
Metadata: &runtime.ContainerMetadata{
Name: "TestPod-1",
Attempt: 1,
},
},
ImageRef: "TestImage-1",
},
"2": {
ID: "2",
Name: "Container-2",
SandboxID: "Sandbox-2",
Config: &runtime.ContainerConfig{
Metadata: &runtime.ContainerMetadata{
Name: "TestPod-2",
Attempt: 2,
},
},
ImageRef: "TestImage-2",
},
"3": {
ID: "3",
Name: "Container-3",
SandboxID: "Sandbox-3",
Config: &runtime.ContainerConfig{
Metadata: &runtime.ContainerMetadata{
Name: "TestPod-3",
Attempt: 3,
},
},
ImageRef: "TestImage-3",
},
}
statuses := map[string]Status{
"1": {
Pid: 1,
CreatedAt: time.Now().UnixNano(),
StartedAt: time.Now().UnixNano(),
FinishedAt: time.Now().UnixNano(),
ExitCode: 1,
Reason: "TestReason-1",
Message: "TestMessage-1",
},
"2": {
Pid: 2,
CreatedAt: time.Now().UnixNano(),
StartedAt: time.Now().UnixNano(),
FinishedAt: time.Now().UnixNano(),
ExitCode: 2,
Reason: "TestReason-2",
Message: "TestMessage-2",
},
"3": {
Pid: 3,
CreatedAt: time.Now().UnixNano(),
StartedAt: time.Now().UnixNano(),
FinishedAt: time.Now().UnixNano(),
ExitCode: 3,
Reason: "TestReason-3",
Message: "TestMessage-3",
Removing: true,
},
}
assert := assertlib.New(t)
containers := map[string]Container{}
for _, id := range ids {
container, err := NewContainer(metadatas[id], statuses[id])
assert.NoError(err)
containers[id] = container
}
s := NewStore()
t.Logf("should be able to add container")
for _, c := range containers {
assert.NoError(s.Add(c))
}
t.Logf("should be able to get container")
for id, c := range containers {
got, err := s.Get(id)
assert.NoError(err)
assert.Equal(c, got)
}
t.Logf("should be able to list containers")
cs := s.List()
assert.Len(cs, 3)
testID := "2"
t.Logf("add should return already exists error for duplicated container")
assert.Equal(store.ErrAlreadyExist, s.Add(containers[testID]))
t.Logf("should be able to delete container")
s.Delete(testID)
cs = s.List()
assert.Len(cs, 2)
t.Logf("get should return not exist error after deletion")
c, err := s.Get(testID)
assert.Equal(Container{}, c)
assert.Equal(store.ErrNotExist, err)
}

View File

@@ -0,0 +1,76 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"encoding/json"
"fmt"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)
// NOTE(random-liu):
// 1) Metadata is immutable after created.
// 2) Metadata is checkpointed as containerd container label.
// metadataVersion is current version of container metadata.
const metadataVersion = "v1" // nolint
// versionedMetadata is the internal versioned container metadata.
// nolint
type versionedMetadata struct {
// Version indicates the version of the versioned container metadata.
Version string
Metadata
}
// Metadata is the unversioned container metadata.
type Metadata struct {
// ID is the container id.
ID string
// Name is the container name.
Name string
// SandboxID is the sandbox id the container belongs to.
SandboxID string
// Config is the CRI container config.
Config *runtime.ContainerConfig
// ImageRef is the reference of image used by the container.
ImageRef string
}
// Encode encodes Metadata into bytes in json format.
func (c *Metadata) Encode() ([]byte, error) {
return json.Marshal(&versionedMetadata{
Version: metadataVersion,
Metadata: *c,
})
}
// Decode decodes Metadata from bytes.
func (c *Metadata) Decode(data []byte) error {
versioned := &versionedMetadata{}
if err := json.Unmarshal(data, versioned); err != nil {
return err
}
// Handle old version after upgrade.
switch versioned.Version {
case metadataVersion:
*c = versioned.Metadata
return nil
}
return fmt.Errorf("unsupported version")
}

View File

@@ -0,0 +1,53 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"encoding/json"
"testing"
assertlib "github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)
func TestMetadataEncodeDecode(t *testing.T) {
meta := &Metadata{
ID: "test-id",
Name: "test-name",
SandboxID: "test-sandbox-id",
Config: &runtime.ContainerConfig{
Metadata: &runtime.ContainerMetadata{
Name: "test-name",
Attempt: 1,
},
},
ImageRef: "test-image-ref",
}
assert := assertlib.New(t)
data, err := meta.Encode()
assert.NoError(err)
newMeta := &Metadata{}
assert.NoError(newMeta.Decode(data))
assert.Equal(meta, newMeta)
unsupported, err := json.Marshal(&versionedMetadata{
Version: "random-test-version",
Metadata: *meta,
})
assert.NoError(err)
assert.Error(newMeta.Decode(unsupported))
}

View File

@@ -0,0 +1,146 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"sync"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)
// TODO(random-liu): Handle versioning.
// TODO(random-liu): Add checkpoint support.
// version is current version of container status.
const version = "v1" // nolint
// versionedStatus is the internal used versioned container status.
// nolint
type versionedStatus struct {
// Version indicates the version of the versioned container status.
Version string
Status
}
// Status is the status of a container.
type Status struct {
// Pid is the init process id of the container.
Pid uint32
// CreatedAt is the created timestamp.
CreatedAt int64
// StartedAt is the started timestamp.
StartedAt int64
// FinishedAt is the finished timestamp.
FinishedAt int64
// ExitCode is the container exit code.
ExitCode int32
// CamelCase string explaining why container is in its current state.
Reason string
// Human-readable message indicating details about why container is in its
// current state.
Message string
// Removing indicates that the container is in removing state.
// This field doesn't need to be checkpointed.
// TODO(random-liu): Reset this field to false during state recoverry.
Removing bool
}
// State returns current state of the container based on the container status.
func (c Status) State() runtime.ContainerState {
if c.FinishedAt != 0 {
return runtime.ContainerState_CONTAINER_EXITED
}
if c.StartedAt != 0 {
return runtime.ContainerState_CONTAINER_RUNNING
}
if c.CreatedAt != 0 {
return runtime.ContainerState_CONTAINER_CREATED
}
return runtime.ContainerState_CONTAINER_UNKNOWN
}
// UpdateFunc is function used to update the container status. If there
// is an error, the update will be rolled back.
type UpdateFunc func(Status) (Status, error)
// StatusStorage manages the container status with a storage backend.
type StatusStorage interface {
// Get a container status.
Get() Status
// Update the container status. Note that the update MUST be applied
// in one transaction.
// TODO(random-liu): Distinguish `UpdateSync` and `Update`, only
// `UpdateSync` should sync data onto disk, so that disk operation
// for non-critical status change could be avoided.
Update(UpdateFunc) error
// Delete the container status.
// Note:
// * Delete should be idempotent.
// * The status must be deleted in one trasaction.
Delete() error
}
// TODO(random-liu): Add factory function and configure checkpoint path.
// StoreStatus creates the storage containing the passed in container status with the
// specified id.
// The status MUST be created in one transaction.
func StoreStatus(id string, status Status) (StatusStorage, error) {
return &statusStorage{status: status}, nil
// TODO(random-liu): Create the data on disk atomically.
}
// LoadStatus loads container status from checkpoint.
func LoadStatus(id string) (StatusStorage, error) {
// TODO(random-liu): Load container status from disk.
return nil, nil
}
type statusStorage struct {
sync.RWMutex
status Status
}
// Get a copy of container status.
func (m *statusStorage) Get() Status {
m.RLock()
defer m.RUnlock()
return m.status
}
// Update the container status.
func (m *statusStorage) Update(u UpdateFunc) error {
m.Lock()
defer m.Unlock()
newStatus, err := u(m.status)
if err != nil {
return err
}
// TODO(random-liu) *Update* existing status on disk atomically,
// return error if checkpoint failed.
m.status = newStatus
return nil
}
// Delete deletes the container status from disk atomically.
func (m *statusStorage) Delete() error {
// TODO(random-liu): Rename the data on the disk, returns error
// if fails. No lock is needed because file rename is atomic.
// TODO(random-liu): Cleanup temporary files generated, do not
// return error.
return nil
}

View File

@@ -0,0 +1,101 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"errors"
"testing"
"time"
assertlib "github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)
func TestContainerState(t *testing.T) {
for c, test := range map[string]struct {
status Status
state runtime.ContainerState
}{
"unknown state": {
status: Status{},
state: runtime.ContainerState_CONTAINER_UNKNOWN,
},
"created state": {
status: Status{
CreatedAt: time.Now().UnixNano(),
},
state: runtime.ContainerState_CONTAINER_CREATED,
},
"running state": {
status: Status{
CreatedAt: time.Now().UnixNano(),
StartedAt: time.Now().UnixNano(),
},
state: runtime.ContainerState_CONTAINER_RUNNING,
},
"exited state": {
status: Status{
CreatedAt: time.Now().UnixNano(),
FinishedAt: time.Now().UnixNano(),
},
state: runtime.ContainerState_CONTAINER_EXITED,
},
} {
t.Logf("TestCase %q", c)
assertlib.Equal(t, test.state, test.status.State())
}
}
func TestStatus(t *testing.T) {
testID := "test-id"
testStatus := Status{
CreatedAt: time.Now().UnixNano(),
}
updateStatus := Status{
CreatedAt: time.Now().UnixNano(),
StartedAt: time.Now().UnixNano(),
}
updateErr := errors.New("update error")
assert := assertlib.New(t)
t.Logf("simple store and get")
s, err := StoreStatus(testID, testStatus)
assert.NoError(err)
old := s.Get()
assert.Equal(testStatus, old)
t.Logf("failed update should not take effect")
err = s.Update(func(o Status) (Status, error) {
o = updateStatus
return o, updateErr
})
assert.Equal(updateErr, err)
assert.Equal(testStatus, s.Get())
t.Logf("successful update should take effect")
err = s.Update(func(o Status) (Status, error) {
o = updateStatus
return o, nil
})
assert.NoError(err)
assert.Equal(updateStatus, s.Get())
t.Logf("successful update should not affect existing snapshot")
assert.Equal(testStatus, old)
// TODO(random-liu): Test Load and Delete after disc checkpoint is added.
}