From 36246167d9c71488b2b73f2772178c4bfc280070 Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Thu, 20 Apr 2017 16:07:30 -0700 Subject: [PATCH 1/3] Add metadata store Signed-off-by: Random-Liu --- pkg/metadata/store/metadata_store.go | 236 +++++++++++++++++++++++++++ 1 file changed, 236 insertions(+) create mode 100644 pkg/metadata/store/metadata_store.go diff --git a/pkg/metadata/store/metadata_store.go b/pkg/metadata/store/metadata_store.go new file mode 100644 index 000000000..19b5ec02b --- /dev/null +++ b/pkg/metadata/store/metadata_store.go @@ -0,0 +1,236 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "fmt" + "sync" + + "github.com/golang/glog" +) + +// All byte arrays are expected to be read-only. User MUST NOT modify byte +// array element directly!! + +// UpdateFunc is function used to update a specific metadata. The value +// passed in is the old value, it MUST NOT be changed in the function. +// The function should make a copy of the old value and apply update on +// the copy. The updated value should be returned. If there is an error, +// the update will be rolled back. +type UpdateFunc func([]byte) ([]byte, error) + +// MetadataStore is the interface for storing metadata. All methods should +// be thread-safe. +// TODO(random-liu): Initialize the metadata store with a type, and replace +// []byte with interface{}, so as to avoid extra marshal/unmarshal on the +// user side. +type MetadataStore interface { + // Create the metadata containing the passed in data with the + // specified id. + // Note: + // * Create MUST return error if the id already exists. + // * The id and data MUST be added in one transaction to the store. + Create(string, []byte) error + // Get the data by id. + // Note that Get MUST return nil without error if the id + // doesn't exist. + Get(string) ([]byte, error) + // Update the data by id. Note that the update MUST be applied in + // one transaction. + Update(string, UpdateFunc) error + // List returns entire array of data from the store. + List() ([][]byte, error) + // Delete the data by id. + // Note: + // * Delete should be idempotent, it MUST not return error if the id + // doesn't exist or has been removed. + // * The id and data MUST be deleted in one transaction. + Delete(string) error +} + +// TODO(random-liu) Add checkpoint. When checkpoint is enabled, it should cache data +// in memory and checkpoint metadata into files during update. Metadata should serve +// from memory, but any modification should be checkpointed, so that memory could be +// recovered after restart. It should be possible to disable the checkpoint for testing. +// Note that checkpoint update may fail, so the recovery logic should tolerate that. + +// metadata is the internal type for storing data in metadataStore. +type metadata struct { + sync.RWMutex + data []byte +} + +// newMetadata creates a new metadata. +func newMetadata(data []byte) (*metadata, error) { + return &metadata{data: data}, nil + // TODO(random-liu): Create the data on disk atomically. +} + +// get a snapshot of the metadata. +func (m *metadata) get() []byte { + m.RLock() + defer m.RUnlock() + return m.data +} + +// update the value. +func (m *metadata) update(u UpdateFunc) error { + m.Lock() + defer m.Unlock() + newData, err := u(m.data) + if err != nil { + return err + } + // Replace with newData, user holding the old data will not + // be affected. + // TODO(random-liu) *Update* existing data on disk atomically, + // return error if checkpoint failed. + m.data = newData + return nil +} + +// delete deletes the data on disk atomically. +func (m *metadata) delete() error { + // TODO(random-liu): Hold write lock, rename the data on the disk. + return nil +} + +// cleanup cleans up all temporary files left-over. +func (m *metadata) cleanup() error { + // TODO(random-liu): Hold write lock, Cleanup temporary files generated + // in atomic file operations. The write lock makes sure there is no on-going + // update, so any temporary files could be removed. + return nil +} + +// metadataStore is metadataStore is an implementation of MetadataStore. +type metadataStore struct { + sync.RWMutex + metas map[string]*metadata +} + +// NewMetadataStore creates a MetadataStore. +func NewMetadataStore() MetadataStore { + // TODO(random-liu): Recover state from disk checkpoint. + // TODO(random-liu): Cleanup temporary files left over. + return &metadataStore{metas: map[string]*metadata{}} +} + +// createMetadata creates metadata with a read-write lock +func (m *metadataStore) createMetadata(id string, meta *metadata) error { + m.Lock() + defer m.Unlock() + if _, found := m.metas[id]; found { + return fmt.Errorf("id %q already exists", id) + } + m.metas[id] = meta + return nil +} + +// Create the metadata with a specific id. +func (m *metadataStore) Create(id string, data []byte) (retErr error) { + // newMetadata takes time, we may not want to lock around it. + meta, err := newMetadata(data) + if err != nil { + return err + } + defer func() { + // This should not happen, because if id already exists, + // newMetadata should fail to checkpoint. Add this just + // in case. + if retErr != nil { + meta.delete() // nolint: errcheck + meta.cleanup() // nolint: errcheck + } + }() + return m.createMetadata(id, meta) +} + +// getMetadata gets metadata by id with a read lock. +func (m *metadataStore) getMetadata(id string) (*metadata, bool) { + m.RLock() + defer m.RUnlock() + meta, found := m.metas[id] + return meta, found +} + +// Get data by id. +func (m *metadataStore) Get(id string) ([]byte, error) { + meta, found := m.getMetadata(id) + if !found { + return nil, nil + } + return meta.get(), nil +} + +// Update data by id. +func (m *metadataStore) Update(id string, u UpdateFunc) error { + meta, found := m.getMetadata(id) + if !found { + return fmt.Errorf("id %q doesn't exist", id) + } + return meta.update(u) +} + +// listMetadata lists all metadata with a read lock. +func (m *metadataStore) listMetadata() []*metadata { + m.RLock() + defer m.RUnlock() + var metas []*metadata + for _, meta := range m.metas { + metas = append(metas, meta) + } + return metas +} + +// List all data. +func (m *metadataStore) List() ([][]byte, error) { + metas := m.listMetadata() + var data [][]byte + for _, meta := range metas { + data = append(data, meta.get()) + } + return data, nil +} + +// Delete the data by id. +func (m *metadataStore) Delete(id string) error { + meta, err := func() (*metadata, error) { + m.Lock() + defer m.Unlock() + meta := m.metas[id] + if meta == nil { + return nil, nil + } + if err := meta.delete(); err != nil { + return nil, err + } + delete(m.metas, id) + return meta, nil + }() + if err != nil { + return err + } + // The metadata is removed from the store at this point. + if meta != nil { + // Do not return error for cleanup. + if err := meta.cleanup(); err != nil { + glog.Errorf("Failed to cleanup metadata %q: %v", id, err) + } + } + return nil +} From 0e7fa9de9b5d43bc56464dd4536ccbddc5c2e13a Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Thu, 20 Apr 2017 16:07:47 -0700 Subject: [PATCH 2/3] Add a sandbox metadata store based on the metadata store Signed-off-by: Random-Liu --- pkg/metadata/sandbox.go | 157 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 pkg/metadata/sandbox.go diff --git a/pkg/metadata/sandbox.go b/pkg/metadata/sandbox.go new file mode 100644 index 000000000..b7b308933 --- /dev/null +++ b/pkg/metadata/sandbox.go @@ -0,0 +1,157 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metadata + +import ( + "encoding/json" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store" + + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" +) + +// TODO(random-liu): Handle versioning around all marshal/unmarshal. +// version and versionedSandboxMetadata is not used now, but should be used +// in the future: +// 1) Only versioned metadata should be written into the store; +// 2) Only unversioned metadata should be returned to the user; +// 3) A conversion function is needed to convert any supported versioned +// metadata into unversioned metadata. + +// sandboxMetadataVersion is current version of sandbox metadata. +const sandboxMetadataVersion = "v1" // nolint + +// versionedSandboxMetadata is the internal struct representing the versioned +// sandbox metadata +// nolint +type versionedSandboxMetadata struct { + // Version indicates the version of the versioned sandbox metadata. + Version string + SandboxMetadata +} + +// SandboxMetadata is the unversioned sandbox metadata. +type SandboxMetadata struct { + // ID is the sandbox id. + ID string + // Name is the sandbox name. + Name string + // Config is the CRI sandbox config. + Config *runtime.PodSandboxConfig + // CreatedAt is the created timestamp. + CreatedAt int64 + // NetNS is the network namespace used by the sandbox. + NetNS string +} + +// SandboxUpdateFunc is the function used to update SandboxMetadata. +type SandboxUpdateFunc func(SandboxMetadata) (SandboxMetadata, error) + +// sandboxToStoreUpdateFunc generates a metadata store UpdateFunc from SandboxUpdateFunc. +func sandboxToStoreUpdateFunc(u SandboxUpdateFunc) store.UpdateFunc { + return func(data []byte) ([]byte, error) { + meta := &SandboxMetadata{} + if err := json.Unmarshal(data, meta); err != nil { + return nil, err + } + newMeta, err := u(*meta) + if err != nil { + return nil, err + } + return json.Marshal(newMeta) + } +} + +// SandboxStore is the store for metadata of all sandboxes. +type SandboxStore interface { + // Create creates a sandbox from SandboxMetadata in the store. + Create(SandboxMetadata) error + // Get gets the specified sandbox. + Get(string) (*SandboxMetadata, error) + // Update updates a specified sandbox. + Update(string, SandboxUpdateFunc) error + // List lists all sandboxes. + List() ([]*SandboxMetadata, error) + // Delete deletes the sandbox from the store. + Delete(string) error +} + +// sandboxStore is an implmentation of SandboxStore. +type sandboxStore struct { + store store.MetadataStore +} + +// NewSandboxStore creates a SandboxStore from a basic MetadataStore. +func NewSandboxStore(store store.MetadataStore) SandboxStore { + return &sandboxStore{store: store} +} + +// Create creates a sandbox from SandboxMetadata in the store. +func (s *sandboxStore) Create(metadata SandboxMetadata) error { + data, err := json.Marshal(&metadata) + if err != nil { + return err + } + return s.store.Create(metadata.ID, data) +} + +// Get gets the specified sandbox. +func (s *sandboxStore) Get(sandboxID string) (*SandboxMetadata, error) { + data, err := s.store.Get(sandboxID) + if err != nil { + return nil, err + } + // Return nil without error if the corresponding metadata + // does not exist. + if data == nil { + return nil, nil + } + sandbox := &SandboxMetadata{} + if err := json.Unmarshal(data, sandbox); err != nil { + return nil, err + } + return sandbox, nil +} + +// Update updates a specified sandbox. The function is running in a +// transaction. Update will not be applied when the update function +// returns error. +func (s *sandboxStore) Update(sandboxID string, u SandboxUpdateFunc) error { + return s.store.Update(sandboxID, sandboxToStoreUpdateFunc(u)) +} + +// List lists all sandboxes. +func (s *sandboxStore) List() ([]*SandboxMetadata, error) { + allData, err := s.store.List() + if err != nil { + return nil, err + } + var sandboxes []*SandboxMetadata + for _, data := range allData { + sandbox := &SandboxMetadata{} + if err := json.Unmarshal(data, sandbox); err != nil { + return nil, err + } + sandboxes = append(sandboxes, sandbox) + } + return sandboxes, nil +} + +// Delete deletes the sandbox from the store. +func (s *sandboxStore) Delete(sandboxID string) error { + return s.store.Delete(sandboxID) +} From 86997f00b22caf184c4710d0197088bd2ad7acb2 Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Tue, 25 Apr 2017 18:27:41 -0700 Subject: [PATCH 3/3] Add unit test for metadata store Signed-off-by: Random-Liu --- pkg/metadata/sandbox_test.go | 120 +++++++++++++ pkg/metadata/store/metadata_store_test.go | 197 ++++++++++++++++++++++ 2 files changed, 317 insertions(+) create mode 100644 pkg/metadata/sandbox_test.go create mode 100644 pkg/metadata/store/metadata_store_test.go diff --git a/pkg/metadata/sandbox_test.go b/pkg/metadata/sandbox_test.go new file mode 100644 index 000000000..8eb1cab86 --- /dev/null +++ b/pkg/metadata/sandbox_test.go @@ -0,0 +1,120 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metadata + +import ( + "testing" + "time" + + assertlib "github.com/stretchr/testify/assert" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store" + + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" +) + +func TestSandboxStore(t *testing.T) { + sandboxes := map[string]*SandboxMetadata{ + "1": { + ID: "1", + Name: "Sandbox-1", + Config: &runtime.PodSandboxConfig{ + Metadata: &runtime.PodSandboxMetadata{ + Name: "TestPod-1", + Uid: "TestUid-1", + Namespace: "TestNamespace-1", + Attempt: 1, + }, + }, + CreatedAt: time.Now().UnixNano(), + NetNS: "TestNetNS-1", + }, + "2": { + ID: "2", + Name: "Sandbox-2", + Config: &runtime.PodSandboxConfig{ + Metadata: &runtime.PodSandboxMetadata{ + Name: "TestPod-2", + Uid: "TestUid-2", + Namespace: "TestNamespace-2", + Attempt: 2, + }, + }, + CreatedAt: time.Now().UnixNano(), + NetNS: "TestNetNS-2", + }, + "3": { + ID: "3", + Name: "Sandbox-3", + Config: &runtime.PodSandboxConfig{ + Metadata: &runtime.PodSandboxMetadata{ + Name: "TestPod-3", + Uid: "TestUid-3", + Namespace: "TestNamespace-3", + Attempt: 3, + }, + }, + CreatedAt: time.Now().UnixNano(), + NetNS: "TestNetNS-3", + }, + } + assert := assertlib.New(t) + + s := NewSandboxStore(store.NewMetadataStore()) + + t.Logf("should be able to create sandbox metadata") + for _, meta := range sandboxes { + assert.NoError(s.Create(*meta)) + } + + t.Logf("should be able to get sandbox metadata") + for id, expectMeta := range sandboxes { + meta, err := s.Get(id) + assert.NoError(err) + assert.Equal(expectMeta, meta) + } + + t.Logf("should be able to list sandbox metadata") + sbs, err := s.List() + assert.NoError(err) + assert.Len(sbs, 3) + + t.Logf("should be able to update sandbox metadata") + testID := "2" + newCreatedAt := time.Now().UnixNano() + expectMeta := *sandboxes[testID] + expectMeta.CreatedAt = newCreatedAt + err = s.Update(testID, func(o SandboxMetadata) (SandboxMetadata, error) { + o.CreatedAt = newCreatedAt + return o, nil + }) + assert.NoError(err) + newMeta, err := s.Get(testID) + assert.NoError(err) + assert.Equal(&expectMeta, newMeta) + + t.Logf("should be able to delete sandbox metadata") + assert.NoError(s.Delete(testID)) + sbs, err = s.List() + assert.NoError(err) + assert.Len(sbs, 2) + + t.Logf("get should return nil without error after deletion") + meta, err := s.Get(testID) + assert.NoError(err) + assert.Nil(meta) +} diff --git a/pkg/metadata/store/metadata_store_test.go b/pkg/metadata/store/metadata_store_test.go new file mode 100644 index 000000000..2b8491dac --- /dev/null +++ b/pkg/metadata/store/metadata_store_test.go @@ -0,0 +1,197 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "bytes" + "errors" + "fmt" + "sync" + "testing" + + assertlib "github.com/stretchr/testify/assert" +) + +func TestMetadata(t *testing.T) { + testData := [][]byte{ + []byte("test-data-1"), + []byte("test-data-2"), + } + updateErr := errors.New("update error") + assert := assertlib.New(t) + + t.Logf("simple create and get") + meta, err := newMetadata(testData[0]) + assert.NoError(err) + old := meta.get() + assert.Equal(testData[0], old) + + t.Logf("failed update should not take effect") + err = meta.update(func(in []byte) ([]byte, error) { + return testData[1], updateErr + }) + assert.Equal(updateErr, err) + assert.Equal(testData[0], meta.get()) + + t.Logf("successful update should take effect") + err = meta.update(func(in []byte) ([]byte, error) { + return testData[1], nil + }) + assert.NoError(err) + assert.Equal(testData[1], meta.get()) + + t.Logf("successful update should not affect existing snapshot") + assert.Equal(testData[0], old) + + // TODO(random-liu): Test deleteCheckpoint and cleanupCheckpoint after + // disk based implementation is added. +} + +func TestMetadataStore(t *testing.T) { + testIds := []string{"id-0", "id-1"} + testMeta := map[string][]byte{ + testIds[0]: []byte("metadata-0"), + testIds[1]: []byte("metadata-1"), + } + assert := assertlib.New(t) + + m := NewMetadataStore() + + t.Logf("should be empty initially") + metas, err := m.List() + assert.NoError(err) + assert.Empty(metas) + + t.Logf("should be able to create metadata") + err = m.Create(testIds[0], testMeta[testIds[0]]) + assert.NoError(err) + + t.Logf("should not be able to create metadata with the same id") + err = m.Create(testIds[0], testMeta[testIds[0]]) + assert.Error(err) + + t.Logf("should be able to list metadata") + err = m.Create(testIds[1], testMeta[testIds[1]]) + assert.NoError(err) + metas, err = m.List() + assert.NoError(err) + assert.True(sliceContainsMap(metas, testMeta)) + + t.Logf("should be able to get metadata by id") + meta, err := m.Get(testIds[1]) + assert.NoError(err) + assert.Equal(testMeta[testIds[1]], meta) + + t.Logf("update should take effect") + m.Update(testIds[1], func(in []byte) ([]byte, error) { + return []byte("updated-metadata-1"), nil + }) + newMeta, err := m.Get(testIds[1]) + assert.NoError(err) + assert.Equal([]byte("updated-metadata-1"), newMeta) + + t.Logf("should be able to delete metadata") + assert.NoError(m.Delete(testIds[1])) + metas, err = m.List() + assert.NoError(err) + assert.Len(metas, 1) + assert.Equal(testMeta[testIds[0]], metas[0]) + meta, err = m.Get(testIds[1]) + assert.NoError(err) + assert.Nil(meta) + + t.Logf("existing reference should not be affected by delete") + assert.Equal([]byte("updated-metadata-1"), newMeta) + + t.Logf("should be able to reuse the same id after deletion") + err = m.Create(testIds[1], testMeta[testIds[1]]) + assert.NoError(err) +} + +// sliceMatchMap checks the same elements with a map. +func sliceContainsMap(s [][]byte, m map[string][]byte) bool { + if len(m) != len(s) { + return false + } + for _, expect := range m { + found := false + for _, got := range s { + if bytes.Equal(expect, got) { + found = true + break + } + } + if !found { + return false + } + } + return true +} + +func TestMultithreadAccess(t *testing.T) { + m := NewMetadataStore() + assert := assertlib.New(t) + routineNum := 10 + var wg sync.WaitGroup + for i := 0; i < routineNum; i++ { + wg.Add(1) + go func(i int) { + id := fmt.Sprintf("%d", i) + + t.Logf("should be able to create id %q", id) + expect := []byte(id) + err := m.Create(id, expect) + assert.NoError(err) + + got, err := m.Get(id) + assert.NoError(err) + assert.Equal(expect, got) + + gotList, err := m.List() + assert.NoError(err) + assert.Contains(gotList, expect) + + t.Logf("should be able to update id %q", id) + expect = []byte("update-" + id) + err = m.Update(id, func([]byte) ([]byte, error) { + return expect, nil + }) + assert.NoError(err) + + got, err = m.Get(id) + assert.NoError(err) + assert.Equal(expect, got) + + t.Logf("should be able to delete id %q", id) + err = m.Delete(id) + assert.NoError(err) + + got, err = m.Get(id) + assert.NoError(err) + assert.Nil(got) + + gotList, err = m.List() + assert.NoError(err) + assert.NotContains(gotList, expect) + + wg.Done() + }(i) + } + wg.Wait() +} + +// TODO(random-liu): Test recover logic once checkpoint recovery is added.