switch to client provided services and address nits

Signed-off-by: Mike Brown <brownwm@us.ibm.com>
This commit is contained in:
Mike Brown
2017-06-20 13:13:12 -05:00
parent 0fe8c17fdf
commit 97063a0e34
19 changed files with 261 additions and 184 deletions

View File

@@ -58,7 +58,7 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C
// the same container.
id := generateID()
name := makeContainerName(config.GetMetadata(), sandboxConfig.GetMetadata())
if err = c.containerNameIndex.Reserve(name, id); err != nil {
if err := c.containerNameIndex.Reserve(name, id); err != nil {
return nil, fmt.Errorf("failed to reserve container name %q: %v", name, err)
}
defer func() {
@@ -101,17 +101,17 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C
// Prepare container rootfs.
if config.GetLinux().GetSecurityContext().GetReadonlyRootfs() {
if _, err = c.snapshotService.View(ctx, id, imageMeta.ChainID); err != nil {
if _, err := c.snapshotService.View(ctx, id, imageMeta.ChainID); err != nil {
return nil, fmt.Errorf("failed to view container rootfs %q: %v", imageMeta.ChainID, err)
}
} else {
if _, err = c.snapshotService.Prepare(ctx, id, imageMeta.ChainID); err != nil {
if _, err := c.snapshotService.Prepare(ctx, id, imageMeta.ChainID); err != nil {
return nil, fmt.Errorf("failed to prepare container rootfs %q: %v", imageMeta.ChainID, err)
}
}
defer func() {
if retErr != nil {
if err = c.snapshotService.Remove(ctx, id); err != nil {
if err := c.snapshotService.Remove(ctx, id); err != nil {
glog.Errorf("Failed to remove container snapshot %q: %v", id, err)
}
}
@@ -120,14 +120,14 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C
// Create container root directory.
containerRootDir := getContainerRootDir(c.rootDir, id)
if err = c.os.MkdirAll(containerRootDir, 0755); err != nil {
if err := c.os.MkdirAll(containerRootDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create container root directory %q: %v",
containerRootDir, err)
}
defer func() {
if retErr != nil {
// Cleanup the container root directory.
if err = c.os.RemoveAll(containerRootDir); err != nil {
if err := c.os.RemoveAll(containerRootDir); err != nil {
glog.Errorf("Failed to remove container root directory %q: %v",
containerRootDir, err)
}

View File

@@ -552,11 +552,11 @@ func TestCreateContainer(t *testing.T) {
"container name should be released")
}
assert.Empty(t, fakeSnapshotClient.ListMounts(), "snapshot should be cleaned up")
listResp, listerr := fake.List(context.Background(), &containers.ListContainersRequest{})
assert.NoError(t, listerr)
listResp, err := fake.List(context.Background(), &containers.ListContainersRequest{})
assert.NoError(t, err)
assert.Empty(t, listResp.Containers, "containerd container should be cleaned up")
metas, metaserr := c.containerStore.List()
assert.NoError(t, metaserr)
metas, err := c.containerStore.List()
assert.NoError(t, err)
assert.Empty(t, metas, "container metadata should not be created")
continue
}

View File

@@ -202,8 +202,8 @@ func TestRemoveContainer(t *testing.T) {
if !test.expectUnsetRemoving {
continue
}
meta, metaerr := c.containerStore.Get(testID)
assert.NoError(t, metaerr)
meta, err := c.containerStore.Get(testID)
assert.NoError(t, err)
require.NotNil(t, meta)
// Also covers resetContainerRemoving.
assert.False(t, meta.Removing, "removing state should be unset")

View File

@@ -137,12 +137,12 @@ func (c *criContainerdService) startContainer(ctx context.Context, id string, me
if config.GetLogPath() != "" {
// Only generate container log when log path is specified.
logPath := filepath.Join(sandboxConfig.GetLogDirectory(), config.GetLogPath())
if err = c.agentFactory.NewContainerLogger(logPath, agents.Stdout, stdoutPipe).Start(); err != nil {
if err := c.agentFactory.NewContainerLogger(logPath, agents.Stdout, stdoutPipe).Start(); err != nil {
return fmt.Errorf("failed to start container stdout logger: %v", err)
}
// Only redirect stderr when there is no tty.
if !config.GetTty() {
if err = c.agentFactory.NewContainerLogger(logPath, agents.Stderr, stderrPipe).Start(); err != nil {
if err := c.agentFactory.NewContainerLogger(logPath, agents.Stderr, stderrPipe).Start(); err != nil {
return fmt.Errorf("failed to start container stderr logger: %v", err)
}
}

View File

@@ -223,7 +223,7 @@ func TestStartContainer(t *testing.T) {
assert.EqualValues(t, errorStartExitCode, meta.ExitCode)
assert.Equal(t, errorStartReason, meta.Reason)
assert.NotEmpty(t, meta.Message)
_, err = fake.Info(context.Background(), &execution.InfoRequest{ContainerID: testID})
_, err := fake.Info(context.Background(), &execution.InfoRequest{ContainerID: testID})
assert.True(t, isContainerdGRPCNotFoundError(err),
"containerd task should be cleaned up when fail to start")
continue

View File

@@ -346,21 +346,21 @@ func (c *criContainerdService) localResolve(ctx context.Context, ref string) (*m
_, err := imagedigest.Parse(ref)
if err != nil {
// ref is not image id, try to resolve it locally.
normalized, nerr := normalizeImageRef(ref)
normalized, err := normalizeImageRef(ref)
if err != nil {
return nil, fmt.Errorf("invalid image reference %q: %v", ref, nerr)
return nil, fmt.Errorf("invalid image reference %q: %v", ref, err)
}
image, gerr := c.imageStoreService.Get(ctx, normalized.String())
if gerr != nil {
if containerdmetadata.IsNotFound(gerr) {
image, err := c.imageStoreService.Get(ctx, normalized.String())
if err != nil {
if containerdmetadata.IsNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("an error occurred when getting image %q from containerd image store: %v",
normalized.String(), gerr)
normalized.String(), err)
}
desc, cerr := image.Config(ctx, c.contentStoreService)
if cerr != nil {
return nil, fmt.Errorf("failed to get image config descriptor: %v", cerr)
desc, err := image.Config(ctx, c.contentStoreService)
if err != nil {
return nil, fmt.Errorf("failed to get image config descriptor: %v", err)
}
ref = desc.Digest.String()
}

View File

@@ -118,7 +118,7 @@ func (c *criContainerdService) PullImage(ctx context.Context, r *runtime.PullIma
// recover in-memory state during startup.
if err == nil {
// Update existing image metadata.
if err = c.imageMetadataStore.Update(imageID, func(m metadata.ImageMetadata) (metadata.ImageMetadata, error) {
if err := c.imageMetadataStore.Update(imageID, func(m metadata.ImageMetadata) (metadata.ImageMetadata, error) {
updateImageMetadata(&m, repoTag, repoDigest)
return m, nil
}); err != nil {
@@ -246,7 +246,7 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) (
glog.V(5).Infof("Dispatch for %q returns error: %v", ref, err)
}
// Wait for the image pulling to finish
if err = c.waitForResourcesDownloading(ctx, resources.all()); err != nil {
if err := c.waitForResourcesDownloading(ctx, resources.all()); err != nil {
return "", "", fmt.Errorf("failed to wait for image %q downloading: %v", ref, err)
}
glog.V(4).Infof("Finished downloading resources for image %q", ref)
@@ -283,7 +283,7 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) (
return "", "", fmt.Errorf("readblob failed for manifest digest %q: %v", manifestDigest, err)
}
var manifest imagespec.Manifest
if err = json.Unmarshal(p, &manifest); err != nil {
if err := json.Unmarshal(p, &manifest); err != nil {
return "", "", fmt.Errorf("unmarshal blob to manifest failed for manifest digest %q: %v",
manifestDigest, err)
}

View File

@@ -64,7 +64,7 @@ func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime.
}
// Remove sandbox container snapshot.
if err = c.snapshotService.Remove(ctx, id); err != nil {
if err := c.snapshotService.Remove(ctx, id); err != nil {
if !snapshot.IsNotExist(err) {
return nil, fmt.Errorf("failed to remove sandbox container snapshot %q: %v", id, err)
}

View File

@@ -94,7 +94,7 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
}
defer func() {
if retErr != nil {
if err = c.snapshotService.Remove(ctx, id); err != nil {
if err := c.snapshotService.Remove(ctx, id); err != nil {
glog.Errorf("Failed to remove sandbox container snapshot %q: %v", id, err)
}
}
@@ -135,7 +135,7 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
}
defer func() {
if retErr != nil {
if _, err = c.containerService.Delete(ctx, &containers.DeleteContainerRequest{ID: id}); err != nil {
if _, err := c.containerService.Delete(ctx, &containers.DeleteContainerRequest{ID: id}); err != nil {
glog.Errorf("Failed to delete containerd container%q: %v", id, err)
}
}
@@ -144,14 +144,14 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
// Create sandbox container root directory.
// Prepare streaming named pipe.
sandboxRootDir := getSandboxRootDir(c.rootDir, id)
if err = c.os.MkdirAll(sandboxRootDir, 0755); err != nil {
if err := c.os.MkdirAll(sandboxRootDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create sandbox root directory %q: %v",
sandboxRootDir, err)
}
defer func() {
if retErr != nil {
// Cleanup the sandbox root directory.
if err = c.os.RemoveAll(sandboxRootDir); err != nil {
if err := c.os.RemoveAll(sandboxRootDir); err != nil {
glog.Errorf("Failed to remove sandbox root directory %q: %v",
sandboxRootDir, err)
}
@@ -170,10 +170,10 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
stderrPipe.Close()
}
}()
if err = c.agentFactory.NewSandboxLogger(stdoutPipe).Start(); err != nil {
if err := c.agentFactory.NewSandboxLogger(stdoutPipe).Start(); err != nil {
return nil, fmt.Errorf("failed to start sandbox stdout logger: %v", err)
}
if err = c.agentFactory.NewSandboxLogger(stderrPipe).Start(); err != nil {
if err := c.agentFactory.NewSandboxLogger(stderrPipe).Start(); err != nil {
return nil, fmt.Errorf("failed to start sandbox stderr logger: %v", err)
}

View File

@@ -21,22 +21,15 @@ import (
"net"
"os"
"syscall"
"time"
"github.com/containerd/containerd/namespaces"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
"k8s.io/kubernetes/pkg/util/interrupt"
)
const (
// unixProtocol is the network protocol of unix socket.
unixProtocol = "unix"
// k8sContainerdNamespace is the namespace we use to connect containerd.
k8sContainerdNamespace = "k8s.io"
)
// unixProtocol is the network protocol of unix socket.
const unixProtocol = "unix"
// CRIContainerdServer is the grpc server of cri-containerd.
type CRIContainerdServer struct {
@@ -79,38 +72,3 @@ func (s *CRIContainerdServer) Run() error {
h := interrupt.New(nil, s.server.Stop)
return h.Run(func() error { return s.server.Serve(l) })
}
// ConnectToContainerd returns a grpc client for containerd.
func ConnectToContainerd(path string, connectionTimeout time.Duration) (*grpc.ClientConn, error) {
// get the containerd client
dialOpts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithTimeout(connectionTimeout),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout(unixProtocol, path, timeout)
}),
grpc.WithUnaryInterceptor(grpc.UnaryClientInterceptor(unary)),
grpc.WithStreamInterceptor(grpc.StreamClientInterceptor(stream)),
}
return grpc.Dial(fmt.Sprintf("%s://%s", unixProtocol, path), dialOpts...)
}
// TODO(random-liu): Get rid of following functions after switching to containerd client.
// unary is a wrapper to apply kubernetes namespace in each grpc unary call.
func unary(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
_, ok := namespaces.Namespace(ctx)
if !ok {
ctx = namespaces.WithNamespace(ctx, k8sContainerdNamespace)
}
return invoker(ctx, method, req, reply, cc, opts...)
}
// stream is a wrapper to apply kubernetes namespace in each grpc stream call.
func stream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
_, ok := namespaces.Namespace(ctx)
if !ok {
ctx = namespaces.WithNamespace(ctx, k8sContainerdNamespace)
}
return streamer(ctx, desc, cc, method, opts...)
}

View File

@@ -21,22 +21,14 @@ import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/api/services/containers"
contentapi "github.com/containerd/containerd/api/services/content"
diffapi "github.com/containerd/containerd/api/services/diff"
"github.com/containerd/containerd/api/services/execution"
imagesapi "github.com/containerd/containerd/api/services/images"
snapshotapi "github.com/containerd/containerd/api/services/snapshot"
versionapi "github.com/containerd/containerd/api/services/version"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
contentservice "github.com/containerd/containerd/services/content"
diffservice "github.com/containerd/containerd/services/diff"
imagesservice "github.com/containerd/containerd/services/images"
snapshotservice "github.com/containerd/containerd/services/snapshot"
"github.com/containerd/containerd/snapshot"
"github.com/docker/docker/pkg/truncindex"
"github.com/kubernetes-incubator/cri-o/pkg/ocicni"
"google.golang.org/grpc"
healthapi "google.golang.org/grpc/health/grpc_health_v1"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
@@ -47,6 +39,9 @@ import (
"github.com/kubernetes-incubator/cri-containerd/pkg/server/agents"
)
// k8sContainerdNamespace is the namespace we use to connect containerd.
const k8sContainerdNamespace = "k8s.io"
// CRIContainerdService is the interface implement CRI remote service server.
type CRIContainerdService interface {
Start()
@@ -105,12 +100,12 @@ type criContainerdService struct {
}
// NewCRIContainerdService returns a new instance of CRIContainerdService
func NewCRIContainerdService(conn *grpc.ClientConn, containerdEndpoint, rootDir, networkPluginBinDir, networkPluginConfDir string) (CRIContainerdService, error) {
func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, networkPluginConfDir string) (CRIContainerdService, error) {
// TODO(random-liu): [P2] Recover from runtime state and metadata store.
client, err := containerd.New(containerdEndpoint)
client, err := containerd.New(containerdEndpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace))
if err != nil {
return nil, fmt.Errorf("failed to initialize client: %v", err)
return nil, fmt.Errorf("failed to initialize containerd client with endpoint %q: %v", containerdEndpoint, err)
}
c := &criContainerdService{
@@ -126,14 +121,14 @@ func NewCRIContainerdService(conn *grpc.ClientConn, containerdEndpoint, rootDir,
sandboxIDIndex: truncindex.NewTruncIndex(nil),
// TODO(random-liu): Add container id index.
containerNameIndex: registrar.NewRegistrar(),
containerService: containers.NewContainersClient(conn),
taskService: execution.NewTasksClient(conn),
imageStoreService: imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)),
contentStoreService: contentservice.NewStoreFromClient(contentapi.NewContentClient(conn)),
snapshotService: snapshotservice.NewSnapshotterFromClient(snapshotapi.NewSnapshotClient(conn)),
diffService: diffservice.NewDiffServiceFromClient(diffapi.NewDiffClient(conn)),
versionService: versionapi.NewVersionClient(conn),
healthService: healthapi.NewHealthClient(conn),
containerService: client.ContainerService(),
taskService: client.TaskService(),
imageStoreService: client.ImageService(),
contentStoreService: client.ContentStore(),
snapshotService: client.SnapshotService(),
diffService: client.DiffService(),
versionService: client.VersionService(),
healthService: client.HealthService(),
agentFactory: agents.NewAgentFactory(),
client: client,
}