Merge pull request #9 from Random-Liu/add-metadata-store

Add the metadata store interface and an in-memory implementation.
This commit is contained in:
Lantao Liu 2017-05-03 14:18:39 -07:00 committed by GitHub
commit 1532079c84
4 changed files with 710 additions and 0 deletions

157
pkg/metadata/sandbox.go Normal file
View File

@ -0,0 +1,157 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metadata
import (
"encoding/json"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
)
// TODO(random-liu): Handle versioning around all marshal/unmarshal.
// version and versionedSandboxMetadata is not used now, but should be used
// in the future:
// 1) Only versioned metadata should be written into the store;
// 2) Only unversioned metadata should be returned to the user;
// 3) A conversion function is needed to convert any supported versioned
// metadata into unversioned metadata.
// sandboxMetadataVersion is current version of sandbox metadata.
const sandboxMetadataVersion = "v1" // nolint
// versionedSandboxMetadata is the internal struct representing the versioned
// sandbox metadata
// nolint
type versionedSandboxMetadata struct {
// Version indicates the version of the versioned sandbox metadata.
Version string
SandboxMetadata
}
// SandboxMetadata is the unversioned sandbox metadata.
type SandboxMetadata struct {
// ID is the sandbox id.
ID string
// Name is the sandbox name.
Name string
// Config is the CRI sandbox config.
Config *runtime.PodSandboxConfig
// CreatedAt is the created timestamp.
CreatedAt int64
// NetNS is the network namespace used by the sandbox.
NetNS string
}
// SandboxUpdateFunc is the function used to update SandboxMetadata.
type SandboxUpdateFunc func(SandboxMetadata) (SandboxMetadata, error)
// sandboxToStoreUpdateFunc generates a metadata store UpdateFunc from SandboxUpdateFunc.
func sandboxToStoreUpdateFunc(u SandboxUpdateFunc) store.UpdateFunc {
return func(data []byte) ([]byte, error) {
meta := &SandboxMetadata{}
if err := json.Unmarshal(data, meta); err != nil {
return nil, err
}
newMeta, err := u(*meta)
if err != nil {
return nil, err
}
return json.Marshal(newMeta)
}
}
// SandboxStore is the store for metadata of all sandboxes.
type SandboxStore interface {
// Create creates a sandbox from SandboxMetadata in the store.
Create(SandboxMetadata) error
// Get gets the specified sandbox.
Get(string) (*SandboxMetadata, error)
// Update updates a specified sandbox.
Update(string, SandboxUpdateFunc) error
// List lists all sandboxes.
List() ([]*SandboxMetadata, error)
// Delete deletes the sandbox from the store.
Delete(string) error
}
// sandboxStore is an implmentation of SandboxStore.
type sandboxStore struct {
store store.MetadataStore
}
// NewSandboxStore creates a SandboxStore from a basic MetadataStore.
func NewSandboxStore(store store.MetadataStore) SandboxStore {
return &sandboxStore{store: store}
}
// Create creates a sandbox from SandboxMetadata in the store.
func (s *sandboxStore) Create(metadata SandboxMetadata) error {
data, err := json.Marshal(&metadata)
if err != nil {
return err
}
return s.store.Create(metadata.ID, data)
}
// Get gets the specified sandbox.
func (s *sandboxStore) Get(sandboxID string) (*SandboxMetadata, error) {
data, err := s.store.Get(sandboxID)
if err != nil {
return nil, err
}
// Return nil without error if the corresponding metadata
// does not exist.
if data == nil {
return nil, nil
}
sandbox := &SandboxMetadata{}
if err := json.Unmarshal(data, sandbox); err != nil {
return nil, err
}
return sandbox, nil
}
// Update updates a specified sandbox. The function is running in a
// transaction. Update will not be applied when the update function
// returns error.
func (s *sandboxStore) Update(sandboxID string, u SandboxUpdateFunc) error {
return s.store.Update(sandboxID, sandboxToStoreUpdateFunc(u))
}
// List lists all sandboxes.
func (s *sandboxStore) List() ([]*SandboxMetadata, error) {
allData, err := s.store.List()
if err != nil {
return nil, err
}
var sandboxes []*SandboxMetadata
for _, data := range allData {
sandbox := &SandboxMetadata{}
if err := json.Unmarshal(data, sandbox); err != nil {
return nil, err
}
sandboxes = append(sandboxes, sandbox)
}
return sandboxes, nil
}
// Delete deletes the sandbox from the store.
func (s *sandboxStore) Delete(sandboxID string) error {
return s.store.Delete(sandboxID)
}

View File

@ -0,0 +1,120 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metadata
import (
"testing"
"time"
assertlib "github.com/stretchr/testify/assert"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
)
func TestSandboxStore(t *testing.T) {
sandboxes := map[string]*SandboxMetadata{
"1": {
ID: "1",
Name: "Sandbox-1",
Config: &runtime.PodSandboxConfig{
Metadata: &runtime.PodSandboxMetadata{
Name: "TestPod-1",
Uid: "TestUid-1",
Namespace: "TestNamespace-1",
Attempt: 1,
},
},
CreatedAt: time.Now().UnixNano(),
NetNS: "TestNetNS-1",
},
"2": {
ID: "2",
Name: "Sandbox-2",
Config: &runtime.PodSandboxConfig{
Metadata: &runtime.PodSandboxMetadata{
Name: "TestPod-2",
Uid: "TestUid-2",
Namespace: "TestNamespace-2",
Attempt: 2,
},
},
CreatedAt: time.Now().UnixNano(),
NetNS: "TestNetNS-2",
},
"3": {
ID: "3",
Name: "Sandbox-3",
Config: &runtime.PodSandboxConfig{
Metadata: &runtime.PodSandboxMetadata{
Name: "TestPod-3",
Uid: "TestUid-3",
Namespace: "TestNamespace-3",
Attempt: 3,
},
},
CreatedAt: time.Now().UnixNano(),
NetNS: "TestNetNS-3",
},
}
assert := assertlib.New(t)
s := NewSandboxStore(store.NewMetadataStore())
t.Logf("should be able to create sandbox metadata")
for _, meta := range sandboxes {
assert.NoError(s.Create(*meta))
}
t.Logf("should be able to get sandbox metadata")
for id, expectMeta := range sandboxes {
meta, err := s.Get(id)
assert.NoError(err)
assert.Equal(expectMeta, meta)
}
t.Logf("should be able to list sandbox metadata")
sbs, err := s.List()
assert.NoError(err)
assert.Len(sbs, 3)
t.Logf("should be able to update sandbox metadata")
testID := "2"
newCreatedAt := time.Now().UnixNano()
expectMeta := *sandboxes[testID]
expectMeta.CreatedAt = newCreatedAt
err = s.Update(testID, func(o SandboxMetadata) (SandboxMetadata, error) {
o.CreatedAt = newCreatedAt
return o, nil
})
assert.NoError(err)
newMeta, err := s.Get(testID)
assert.NoError(err)
assert.Equal(&expectMeta, newMeta)
t.Logf("should be able to delete sandbox metadata")
assert.NoError(s.Delete(testID))
sbs, err = s.List()
assert.NoError(err)
assert.Len(sbs, 2)
t.Logf("get should return nil without error after deletion")
meta, err := s.Get(testID)
assert.NoError(err)
assert.Nil(meta)
}

View File

@ -0,0 +1,236 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
"sync"
"github.com/golang/glog"
)
// All byte arrays are expected to be read-only. User MUST NOT modify byte
// array element directly!!
// UpdateFunc is function used to update a specific metadata. The value
// passed in is the old value, it MUST NOT be changed in the function.
// The function should make a copy of the old value and apply update on
// the copy. The updated value should be returned. If there is an error,
// the update will be rolled back.
type UpdateFunc func([]byte) ([]byte, error)
// MetadataStore is the interface for storing metadata. All methods should
// be thread-safe.
// TODO(random-liu): Initialize the metadata store with a type, and replace
// []byte with interface{}, so as to avoid extra marshal/unmarshal on the
// user side.
type MetadataStore interface {
// Create the metadata containing the passed in data with the
// specified id.
// Note:
// * Create MUST return error if the id already exists.
// * The id and data MUST be added in one transaction to the store.
Create(string, []byte) error
// Get the data by id.
// Note that Get MUST return nil without error if the id
// doesn't exist.
Get(string) ([]byte, error)
// Update the data by id. Note that the update MUST be applied in
// one transaction.
Update(string, UpdateFunc) error
// List returns entire array of data from the store.
List() ([][]byte, error)
// Delete the data by id.
// Note:
// * Delete should be idempotent, it MUST not return error if the id
// doesn't exist or has been removed.
// * The id and data MUST be deleted in one transaction.
Delete(string) error
}
// TODO(random-liu) Add checkpoint. When checkpoint is enabled, it should cache data
// in memory and checkpoint metadata into files during update. Metadata should serve
// from memory, but any modification should be checkpointed, so that memory could be
// recovered after restart. It should be possible to disable the checkpoint for testing.
// Note that checkpoint update may fail, so the recovery logic should tolerate that.
// metadata is the internal type for storing data in metadataStore.
type metadata struct {
sync.RWMutex
data []byte
}
// newMetadata creates a new metadata.
func newMetadata(data []byte) (*metadata, error) {
return &metadata{data: data}, nil
// TODO(random-liu): Create the data on disk atomically.
}
// get a snapshot of the metadata.
func (m *metadata) get() []byte {
m.RLock()
defer m.RUnlock()
return m.data
}
// update the value.
func (m *metadata) update(u UpdateFunc) error {
m.Lock()
defer m.Unlock()
newData, err := u(m.data)
if err != nil {
return err
}
// Replace with newData, user holding the old data will not
// be affected.
// TODO(random-liu) *Update* existing data on disk atomically,
// return error if checkpoint failed.
m.data = newData
return nil
}
// delete deletes the data on disk atomically.
func (m *metadata) delete() error {
// TODO(random-liu): Hold write lock, rename the data on the disk.
return nil
}
// cleanup cleans up all temporary files left-over.
func (m *metadata) cleanup() error {
// TODO(random-liu): Hold write lock, Cleanup temporary files generated
// in atomic file operations. The write lock makes sure there is no on-going
// update, so any temporary files could be removed.
return nil
}
// metadataStore is metadataStore is an implementation of MetadataStore.
type metadataStore struct {
sync.RWMutex
metas map[string]*metadata
}
// NewMetadataStore creates a MetadataStore.
func NewMetadataStore() MetadataStore {
// TODO(random-liu): Recover state from disk checkpoint.
// TODO(random-liu): Cleanup temporary files left over.
return &metadataStore{metas: map[string]*metadata{}}
}
// createMetadata creates metadata with a read-write lock
func (m *metadataStore) createMetadata(id string, meta *metadata) error {
m.Lock()
defer m.Unlock()
if _, found := m.metas[id]; found {
return fmt.Errorf("id %q already exists", id)
}
m.metas[id] = meta
return nil
}
// Create the metadata with a specific id.
func (m *metadataStore) Create(id string, data []byte) (retErr error) {
// newMetadata takes time, we may not want to lock around it.
meta, err := newMetadata(data)
if err != nil {
return err
}
defer func() {
// This should not happen, because if id already exists,
// newMetadata should fail to checkpoint. Add this just
// in case.
if retErr != nil {
meta.delete() // nolint: errcheck
meta.cleanup() // nolint: errcheck
}
}()
return m.createMetadata(id, meta)
}
// getMetadata gets metadata by id with a read lock.
func (m *metadataStore) getMetadata(id string) (*metadata, bool) {
m.RLock()
defer m.RUnlock()
meta, found := m.metas[id]
return meta, found
}
// Get data by id.
func (m *metadataStore) Get(id string) ([]byte, error) {
meta, found := m.getMetadata(id)
if !found {
return nil, nil
}
return meta.get(), nil
}
// Update data by id.
func (m *metadataStore) Update(id string, u UpdateFunc) error {
meta, found := m.getMetadata(id)
if !found {
return fmt.Errorf("id %q doesn't exist", id)
}
return meta.update(u)
}
// listMetadata lists all metadata with a read lock.
func (m *metadataStore) listMetadata() []*metadata {
m.RLock()
defer m.RUnlock()
var metas []*metadata
for _, meta := range m.metas {
metas = append(metas, meta)
}
return metas
}
// List all data.
func (m *metadataStore) List() ([][]byte, error) {
metas := m.listMetadata()
var data [][]byte
for _, meta := range metas {
data = append(data, meta.get())
}
return data, nil
}
// Delete the data by id.
func (m *metadataStore) Delete(id string) error {
meta, err := func() (*metadata, error) {
m.Lock()
defer m.Unlock()
meta := m.metas[id]
if meta == nil {
return nil, nil
}
if err := meta.delete(); err != nil {
return nil, err
}
delete(m.metas, id)
return meta, nil
}()
if err != nil {
return err
}
// The metadata is removed from the store at this point.
if meta != nil {
// Do not return error for cleanup.
if err := meta.cleanup(); err != nil {
glog.Errorf("Failed to cleanup metadata %q: %v", id, err)
}
}
return nil
}

View File

@ -0,0 +1,197 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"bytes"
"errors"
"fmt"
"sync"
"testing"
assertlib "github.com/stretchr/testify/assert"
)
func TestMetadata(t *testing.T) {
testData := [][]byte{
[]byte("test-data-1"),
[]byte("test-data-2"),
}
updateErr := errors.New("update error")
assert := assertlib.New(t)
t.Logf("simple create and get")
meta, err := newMetadata(testData[0])
assert.NoError(err)
old := meta.get()
assert.Equal(testData[0], old)
t.Logf("failed update should not take effect")
err = meta.update(func(in []byte) ([]byte, error) {
return testData[1], updateErr
})
assert.Equal(updateErr, err)
assert.Equal(testData[0], meta.get())
t.Logf("successful update should take effect")
err = meta.update(func(in []byte) ([]byte, error) {
return testData[1], nil
})
assert.NoError(err)
assert.Equal(testData[1], meta.get())
t.Logf("successful update should not affect existing snapshot")
assert.Equal(testData[0], old)
// TODO(random-liu): Test deleteCheckpoint and cleanupCheckpoint after
// disk based implementation is added.
}
func TestMetadataStore(t *testing.T) {
testIds := []string{"id-0", "id-1"}
testMeta := map[string][]byte{
testIds[0]: []byte("metadata-0"),
testIds[1]: []byte("metadata-1"),
}
assert := assertlib.New(t)
m := NewMetadataStore()
t.Logf("should be empty initially")
metas, err := m.List()
assert.NoError(err)
assert.Empty(metas)
t.Logf("should be able to create metadata")
err = m.Create(testIds[0], testMeta[testIds[0]])
assert.NoError(err)
t.Logf("should not be able to create metadata with the same id")
err = m.Create(testIds[0], testMeta[testIds[0]])
assert.Error(err)
t.Logf("should be able to list metadata")
err = m.Create(testIds[1], testMeta[testIds[1]])
assert.NoError(err)
metas, err = m.List()
assert.NoError(err)
assert.True(sliceContainsMap(metas, testMeta))
t.Logf("should be able to get metadata by id")
meta, err := m.Get(testIds[1])
assert.NoError(err)
assert.Equal(testMeta[testIds[1]], meta)
t.Logf("update should take effect")
m.Update(testIds[1], func(in []byte) ([]byte, error) {
return []byte("updated-metadata-1"), nil
})
newMeta, err := m.Get(testIds[1])
assert.NoError(err)
assert.Equal([]byte("updated-metadata-1"), newMeta)
t.Logf("should be able to delete metadata")
assert.NoError(m.Delete(testIds[1]))
metas, err = m.List()
assert.NoError(err)
assert.Len(metas, 1)
assert.Equal(testMeta[testIds[0]], metas[0])
meta, err = m.Get(testIds[1])
assert.NoError(err)
assert.Nil(meta)
t.Logf("existing reference should not be affected by delete")
assert.Equal([]byte("updated-metadata-1"), newMeta)
t.Logf("should be able to reuse the same id after deletion")
err = m.Create(testIds[1], testMeta[testIds[1]])
assert.NoError(err)
}
// sliceMatchMap checks the same elements with a map.
func sliceContainsMap(s [][]byte, m map[string][]byte) bool {
if len(m) != len(s) {
return false
}
for _, expect := range m {
found := false
for _, got := range s {
if bytes.Equal(expect, got) {
found = true
break
}
}
if !found {
return false
}
}
return true
}
func TestMultithreadAccess(t *testing.T) {
m := NewMetadataStore()
assert := assertlib.New(t)
routineNum := 10
var wg sync.WaitGroup
for i := 0; i < routineNum; i++ {
wg.Add(1)
go func(i int) {
id := fmt.Sprintf("%d", i)
t.Logf("should be able to create id %q", id)
expect := []byte(id)
err := m.Create(id, expect)
assert.NoError(err)
got, err := m.Get(id)
assert.NoError(err)
assert.Equal(expect, got)
gotList, err := m.List()
assert.NoError(err)
assert.Contains(gotList, expect)
t.Logf("should be able to update id %q", id)
expect = []byte("update-" + id)
err = m.Update(id, func([]byte) ([]byte, error) {
return expect, nil
})
assert.NoError(err)
got, err = m.Get(id)
assert.NoError(err)
assert.Equal(expect, got)
t.Logf("should be able to delete id %q", id)
err = m.Delete(id)
assert.NoError(err)
got, err = m.Get(id)
assert.NoError(err)
assert.Nil(got)
gotList, err = m.List()
assert.NoError(err)
assert.NotContains(gotList, expect)
wg.Done()
}(i)
}
wg.Wait()
}
// TODO(random-liu): Test recover logic once checkpoint recovery is added.