From ed640d3972f0bf1d62c029eee97ffa5d1afd202c Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Thu, 17 Aug 2017 00:43:36 +0000 Subject: [PATCH] Some cleanup after switching to new client. Signed-off-by: Lantao Liu --- pkg/server/container_execsync.go | 91 ++++++++++++++++---------------- pkg/server/service.go | 25 +-------- 2 files changed, 47 insertions(+), 69 deletions(-) diff --git a/pkg/server/container_execsync.go b/pkg/server/container_execsync.go index e7f267e84..17be309e3 100644 --- a/pkg/server/container_execsync.go +++ b/pkg/server/container_execsync.go @@ -23,10 +23,10 @@ import ( "time" "github.com/containerd/containerd" - "github.com/containerd/containerd/api/services/events/v1" - "github.com/containerd/containerd/typeurl" + "github.com/containerd/containerd/errdefs" "github.com/golang/glog" "golang.org/x/net/context" + "golang.org/x/sys/unix" "k8s.io/client-go/tools/remotecommand" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) @@ -72,10 +72,21 @@ type execOptions struct { timeout time.Duration } +// execResult is the result returned by exec. +type execResult struct { + exitCode uint32 + err error +} + // execInContainer executes a command inside the container synchronously, and // redirects stdio stream properly. -// TODO(random-liu): Support timeout. func (c *criContainerdService) execInContainer(ctx context.Context, id string, opts execOptions) (*uint32, error) { + // Cancel the context before returning to ensure goroutines are stopped. + // This is important, because if `Start` returns error, `Wait` will hang + // forever unless we cancel the context. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + // Get container from our container store. cntr, err := c.containerStore.Get(id) if err != nil { @@ -88,11 +99,7 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o return nil, fmt.Errorf("container is in %s state", criContainerStateToString(state)) } - // TODO(random-liu): Store container client in container store. - container, err := c.client.LoadContainer(ctx, id) - if err != nil { - return nil, fmt.Errorf("failed to load container: %v", err) - } + container := cntr.Container spec, err := container.Spec() if err != nil { return nil, fmt.Errorf("failed to get container spec: %v", err) @@ -120,6 +127,8 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o return nil, fmt.Errorf("failed to create exec %q: %v", execID, err) } defer func() { + // TODO(random-liu): There is a containerd bug here containerd#1376, revisit this + // after that is fixed. if _, err := process.Delete(ctx); err != nil { glog.Errorf("Failed to delete exec process %q for container %q: %v", execID, id, err) } @@ -131,50 +140,40 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o } }) - // Get containerd event client first, so that we won't miss any events. - // TODO(random-liu): Add filter to only subscribe events of the exec process. - // TODO(random-liu): Use `Wait` after is fixed. (containerd#1279, containerd#1287) - cancellable, cancel := context.WithCancel(ctx) - eventstream, err := c.eventService.Subscribe(cancellable, &events.SubscribeRequest{}) - if err != nil { - return nil, fmt.Errorf("failed to subscribe event stream: %v", err) - } - defer cancel() - + resCh := make(chan execResult, 1) + go func() { + // Wait will return if context is cancelled. + exitCode, err := process.Wait(ctx) + resCh <- execResult{ + exitCode: exitCode, + err: err, + } + glog.V(2).Infof("Exec process %q exits with exit code %d and error %v", execID, exitCode, err) + }() if err := process.Start(ctx); err != nil { return nil, fmt.Errorf("failed to start exec %q: %v", execID, err) } - exitCode, err := c.waitContainerExec(eventstream, id, execID) - if err != nil { - return nil, fmt.Errorf("failed to wait for exec in container %q to finish: %v", id, err) + var timeoutCh <-chan time.Time + if opts.timeout == 0 { + // Do not set timeout if it's 0. + timeoutCh = make(chan time.Time) + } else { + timeoutCh = time.After(opts.timeout) } - - // Wait for the io to be drained. - process.IO().Wait() - - return exitCode, nil -} - -// waitContainerExec waits for container exec to finish and returns the exit code. -func (c *criContainerdService) waitContainerExec(eventstream events.Events_SubscribeClient, id string, - execID string) (*uint32, error) { - for { - evt, err := eventstream.Recv() - if err != nil { - return nil, err + select { + case <-timeoutCh: + // Ignore the not found error because the process may exit itself before killing. + if err := process.Kill(ctx, unix.SIGKILL); err != nil && !errdefs.IsNotFound(err) { + return nil, fmt.Errorf("failed to kill exec %q: %v", execID, err) } - // Continue until the event received is of type task exit. - if !typeurl.Is(evt.Event, &events.TaskExit{}) { - continue - } - any, err := typeurl.UnmarshalAny(evt.Event) - if err != nil { - return nil, err - } - e := any.(*events.TaskExit) - if e.ContainerID == id && e.ID == execID { - return &e.ExitStatus, nil + // Wait for the process to be killed. + <-resCh + return nil, fmt.Errorf("timeout %v exceeded", opts.timeout) + case res := <-resCh: + if res.err != nil { + return nil, fmt.Errorf("failed to wait for exec %q: %v", execID, res.err) } + return &res.exitCode, nil } } diff --git a/pkg/server/service.go b/pkg/server/service.go index 3bf193cc7..3400c1974 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -22,15 +22,10 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/api/services/tasks/v1" - versionapi "github.com/containerd/containerd/api/services/version/v1" - "github.com/containerd/containerd/containers" "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" - diffservice "github.com/containerd/containerd/services/diff" - "github.com/containerd/containerd/snapshot" "github.com/golang/glog" "github.com/kubernetes-incubator/cri-o/pkg/ocicni" - healthapi "google.golang.org/grpc/health/grpc_health_v1" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/server/streaming" @@ -73,25 +68,15 @@ type criContainerdService struct { containerNameIndex *registrar.Registrar // imageStore stores all resources associated with images. imageStore *imagestore.Store - // containerService is containerd containers client. - containerService containers.Store // taskService is containerd tasks client. taskService tasks.TasksClient // contentStoreService is the containerd content service client. contentStoreService content.Store - // snapshotService is the containerd snapshot service client. - snapshotService snapshot.Snapshotter - // diffService is the containerd diff service client. - diffService diffservice.DiffService // imageStoreService is the containerd service to store and track // image metadata. imageStoreService images.Store // eventsService is the containerd task service client eventService events.EventsClient - // versionService is the containerd version service client. - versionService versionapi.VersionClient - // healthService is the healthcheck service of containerd grpc server. - healthService healthapi.HealthClient // netPlugin is used to setup and teardown network when run/stop pod sandbox. netPlugin ocicni.CNIPlugin // agentFactory is the factory to create agent used in the cri containerd service. @@ -122,18 +107,12 @@ func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, n imageStore: imagestore.NewStore(), sandboxNameIndex: registrar.NewRegistrar(), containerNameIndex: registrar.NewRegistrar(), - containerService: client.ContainerService(), taskService: client.TaskService(), imageStoreService: client.ImageService(), eventService: client.EventService(), contentStoreService: client.ContentStore(), - // Use daemon default snapshotter. - snapshotService: client.SnapshotService(""), - diffService: client.DiffService(), - versionService: client.VersionService(), - healthService: client.HealthService(), - agentFactory: agents.NewAgentFactory(), - client: client, + agentFactory: agents.NewAgentFactory(), + client: client, } netPlugin, err := ocicni.InitCNI(networkPluginBinDir, networkPluginConfDir)