Add ImageFsInfo support

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu
2017-09-18 07:52:33 +00:00
parent b85be3d0cd
commit 491400c892
13 changed files with 507 additions and 7 deletions

View File

@@ -17,7 +17,7 @@ limitations under the License.
package server
import (
"errors"
"time"
"golang.org/x/net/context"
@@ -26,5 +26,26 @@ import (
// ImageFsInfo returns information of the filesystem that is used to store images.
func (c *criContainerdService) ImageFsInfo(ctx context.Context, r *runtime.ImageFsInfoRequest) (*runtime.ImageFsInfoResponse, error) {
return nil, errors.New("not implemented")
snapshots := c.snapshotStore.List()
timestamp := time.Now().UnixNano()
var usedBytes, inodesUsed uint64
for _, sn := range snapshots {
// Use the oldest timestamp as the timestamp of imagefs info.
if sn.Timestamp < timestamp {
timestamp = sn.Timestamp
}
usedBytes += sn.Size
inodesUsed += sn.Inodes
}
// TODO(random-liu): Handle content store
return &runtime.ImageFsInfoResponse{
ImageFilesystems: []*runtime.FilesystemUsage{
{
Timestamp: timestamp,
StorageId: &runtime.StorageIdentifier{Uuid: c.imageFSUUID},
UsedBytes: &runtime.UInt64Value{Value: usedBytes},
InodesUsed: &runtime.UInt64Value{Value: inodesUsed},
},
},
}, nil
}

View File

@@ -0,0 +1,70 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"testing"
"github.com/containerd/containerd/snapshot"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
snapshotstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/snapshot"
)
func TestImageFsInfo(t *testing.T) {
c := newTestCRIContainerdService()
snapshots := []snapshotstore.Snapshot{
{
Key: "key1",
Kind: snapshot.KindActive,
Size: 10,
Inodes: 100,
Timestamp: 234567,
},
{
Key: "key2",
Kind: snapshot.KindCommitted,
Size: 20,
Inodes: 200,
Timestamp: 123456,
},
{
Key: "key3",
Kind: snapshot.KindView,
Size: 0,
Inodes: 0,
Timestamp: 345678,
},
}
expected := &runtime.FilesystemUsage{
Timestamp: 123456,
StorageId: &runtime.StorageIdentifier{Uuid: testImageFSUUID},
UsedBytes: &runtime.UInt64Value{Value: 30},
InodesUsed: &runtime.UInt64Value{Value: 300},
}
for _, sn := range snapshots {
c.snapshotStore.Add(sn)
}
resp, err := c.ImageFsInfo(context.Background(), &runtime.ImageFsInfoRequest{})
require.NoError(t, err)
stats := resp.GetImageFilesystems()
assert.Len(t, stats, 1)
assert.Equal(t, expected, stats[0])
}

View File

@@ -280,3 +280,14 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov
}()
return in.criContainerdService.RemoveImage(ctx, r)
}
func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.ImageFsInfoRequest) (res *runtime.ImageFsInfoResponse, err error) {
glog.V(4).Infof("ImageFsInfo")
defer func() {
if err != nil {
glog.Errorf("ImageFsInfo failed, error: %v", err)
} else {
glog.V(4).Infof("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems)
}
}()
return in.criContainerdService.ImageFsInfo(ctx, r)
}

View File

@@ -20,12 +20,15 @@ import (
"fmt"
"net"
"os"
"path/filepath"
"syscall"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/plugin"
"github.com/cri-o/ocicni/pkg/ocicni"
"github.com/golang/glog"
"golang.org/x/net/context"
@@ -39,6 +42,7 @@ import (
containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container"
imagestore "github.com/kubernetes-incubator/cri-containerd/pkg/store/image"
sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox"
snapshotstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/snapshot"
)
const (
@@ -60,6 +64,8 @@ type CRIContainerdService interface {
type criContainerdService struct {
// config contains all configurations.
config options.Config
// imageFSUUID is the device uuid of image filesystem.
imageFSUUID string
// server is the grpc server.
server *grpc.Server
// os is an interface for all required os operations.
@@ -76,6 +82,8 @@ type criContainerdService struct {
containerNameIndex *registrar.Registrar
// imageStore stores all resources associated with images.
imageStore *imagestore.Store
// snapshotStore stores information of all snapshots.
snapshotStore *snapshotstore.Store
// taskService is containerd tasks client.
taskService tasks.TasksClient
// contentStoreService is the containerd content service client.
@@ -113,6 +121,7 @@ func NewCRIContainerdService(config options.Config) (CRIContainerdService, error
sandboxStore: sandboxstore.NewStore(),
containerStore: containerstore.NewStore(),
imageStore: imagestore.NewStore(),
snapshotStore: snapshotstore.NewStore(),
sandboxNameIndex: registrar.NewRegistrar(),
containerNameIndex: registrar.NewRegistrar(),
taskService: client.TaskService(),
@@ -121,11 +130,16 @@ func NewCRIContainerdService(config options.Config) (CRIContainerdService, error
client: client,
}
netPlugin, err := ocicni.InitCNI(config.NetworkPluginConfDir, config.NetworkPluginBinDir)
imageFSPath := imageFSPath(config.ContainerdRootDir, config.ContainerdSnapshotter)
c.imageFSUUID, err = c.getDeviceUUID(imageFSPath)
if err != nil {
return nil, fmt.Errorf("failed to get imagefs uuid: %v", err)
}
c.netPlugin, err = ocicni.InitCNI(config.NetworkPluginConfDir, config.NetworkPluginBinDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize cni plugin: %v", err)
}
c.netPlugin = netPlugin
// prepare streaming server
c.streamServer, err = newStreamServer(c, config.StreamServerAddress, config.StreamServerPort)
@@ -156,6 +170,15 @@ func (c *criContainerdService) Run() error {
glog.V(2).Info("Start event monitor")
eventMonitorCloseCh := c.eventMonitor.start()
// Start snapshot stats syncer, it doesn't need to be stopped.
glog.V(2).Info("Start snapshots syncer")
snapshotsSyncer := newSnapshotsSyncer(
c.snapshotStore,
c.client.SnapshotService(c.config.ContainerdSnapshotter),
time.Duration(c.config.StatsCollectPeriod)*time.Second,
)
snapshotsSyncer.start()
// Start streaming server.
glog.V(2).Info("Start streaming server")
streamServerCloseCh := make(chan struct{})
@@ -209,3 +232,18 @@ func (c *criContainerdService) Stop() {
c.streamServer.Stop() // nolint: errcheck
c.server.Stop()
}
// getDeviceUUID gets device uuid for a given path.
func (c *criContainerdService) getDeviceUUID(path string) (string, error) {
info, err := c.os.LookupMount(path)
if err != nil {
return "", err
}
return c.os.DeviceUUID(info.Source)
}
// imageFSPath returns containerd image filesystem path.
// Note that if containerd changes directory layout, we also needs to change this.
func imageFSPath(rootDir, snapshotter string) string {
return filepath.Join(rootDir, fmt.Sprintf("%s.%s", plugin.SnapshotPlugin, snapshotter))
}

View File

@@ -24,6 +24,7 @@ import (
containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container"
imagestore "github.com/kubernetes-incubator/cri-containerd/pkg/store/image"
sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox"
snapshotstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/snapshot"
)
const (
@@ -32,6 +33,7 @@ const (
// TODO(random-liu): Change this to image name after we have complete image
// management unit test framework.
testSandboxImage = "sha256:c75bebcdd211f41b3a460c7bf82970ed6c75acaab9cd4c9a4e125b03ca113798"
testImageFSUUID = "test-image-fs-uuid"
)
// newTestCRIContainerdService creates a fake criContainerdService for test.
@@ -41,9 +43,11 @@ func newTestCRIContainerdService() *criContainerdService {
RootDir: testRootDir,
SandboxImage: testSandboxImage,
},
imageFSUUID: testImageFSUUID,
os: ostesting.NewFakeOS(),
sandboxStore: sandboxstore.NewStore(),
imageStore: imagestore.NewStore(),
snapshotStore: snapshotstore.NewStore(),
sandboxNameIndex: registrar.NewRegistrar(),
containerStore: containerstore.NewStore(),
containerNameIndex: registrar.NewRegistrar(),

110
pkg/server/snapshots.go Normal file
View File

@@ -0,0 +1,110 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"context"
"time"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/snapshot"
"github.com/golang/glog"
snapshotstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/snapshot"
)
// snapshotsSyncer syncs snapshot stats periodically. imagefs info and container stats
// should both use cached result here.
// TODO(random-liu): Benchmark with high workload. We may need a statsSyncer instead if
// benchmark result shows that container cpu/memory stats also need to be cached.
type snapshotsSyncer struct {
store *snapshotstore.Store
snapshotter snapshot.Snapshotter
syncPeriod time.Duration
}
// newSnapshotsSyncer creates a snapshot syncer.
func newSnapshotsSyncer(store *snapshotstore.Store, snapshotter snapshot.Snapshotter,
period time.Duration) *snapshotsSyncer {
return &snapshotsSyncer{
store: store,
snapshotter: snapshotter,
syncPeriod: period,
}
}
// start starts the snapshots syncer. No stop function is needed because
// the syncer doesn't update any persistent states, it's fine to let it
// exit with the process.
func (s *snapshotsSyncer) start() {
tick := time.NewTicker(s.syncPeriod)
go func() {
defer tick.Stop()
// TODO(random-liu): This is expensive. We should do benchmark to
// check the resource usage and optimize this.
for {
if err := s.sync(); err != nil {
glog.Errorf("Failed to sync snapshot stats: %v", err)
}
<-tick.C
}
}()
}
// sync updates all snapshots stats.
func (s *snapshotsSyncer) sync() error {
start := time.Now().UnixNano()
collect := func(ctx context.Context, info snapshot.Info) error {
sn, err := s.store.Get(info.Name)
if err == nil {
// Only update timestamp for non-active snapshot.
if sn.Kind == info.Kind && sn.Kind != snapshot.KindActive {
sn.Timestamp = time.Now().UnixNano()
s.store.Add(sn)
return nil
}
}
// Get newest stats if the snapshot is new or active.
sn = snapshotstore.Snapshot{
Key: info.Name,
Kind: info.Kind,
Timestamp: time.Now().UnixNano(),
}
usage, err := s.snapshotter.Usage(ctx, info.Name)
if err != nil {
if errdefs.IsNotFound(err) {
return nil
}
return err
}
sn.Size = uint64(usage.Size)
sn.Inodes = uint64(usage.Inodes)
s.store.Add(sn)
return nil
}
if err := s.snapshotter.Walk(context.Background(), collect); err != nil {
return err
}
for _, sn := range s.store.List() {
if sn.Timestamp >= start {
continue
}
// Delete the snapshot stats if it's not updated this time.
s.store.Delete(sn.Key)
}
return nil
}