Merge pull request #137 from Random-Liu/cleanup-with-new-client

Some cleanup after switching to new client.
This commit is contained in:
Lantao Liu 2017-08-18 15:04:24 -07:00 committed by GitHub
commit dcc3cb2a05
2 changed files with 47 additions and 69 deletions

View File

@ -23,10 +23,10 @@ import (
"time" "time"
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/typeurl"
"github.com/golang/glog" "github.com/golang/glog"
"golang.org/x/net/context" "golang.org/x/net/context"
"golang.org/x/sys/unix"
"k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/tools/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
) )
@ -72,10 +72,21 @@ type execOptions struct {
timeout time.Duration timeout time.Duration
} }
// execResult is the result returned by exec.
type execResult struct {
exitCode uint32
err error
}
// execInContainer executes a command inside the container synchronously, and // execInContainer executes a command inside the container synchronously, and
// redirects stdio stream properly. // redirects stdio stream properly.
// TODO(random-liu): Support timeout.
func (c *criContainerdService) execInContainer(ctx context.Context, id string, opts execOptions) (*uint32, error) { func (c *criContainerdService) execInContainer(ctx context.Context, id string, opts execOptions) (*uint32, error) {
// Cancel the context before returning to ensure goroutines are stopped.
// This is important, because if `Start` returns error, `Wait` will hang
// forever unless we cancel the context.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Get container from our container store. // Get container from our container store.
cntr, err := c.containerStore.Get(id) cntr, err := c.containerStore.Get(id)
if err != nil { if err != nil {
@ -88,11 +99,7 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
return nil, fmt.Errorf("container is in %s state", criContainerStateToString(state)) return nil, fmt.Errorf("container is in %s state", criContainerStateToString(state))
} }
// TODO(random-liu): Store container client in container store. container := cntr.Container
container, err := c.client.LoadContainer(ctx, id)
if err != nil {
return nil, fmt.Errorf("failed to load container: %v", err)
}
spec, err := container.Spec() spec, err := container.Spec()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get container spec: %v", err) return nil, fmt.Errorf("failed to get container spec: %v", err)
@ -120,6 +127,8 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
return nil, fmt.Errorf("failed to create exec %q: %v", execID, err) return nil, fmt.Errorf("failed to create exec %q: %v", execID, err)
} }
defer func() { defer func() {
// TODO(random-liu): There is a containerd bug here containerd#1376, revisit this
// after that is fixed.
if _, err := process.Delete(ctx); err != nil { if _, err := process.Delete(ctx); err != nil {
glog.Errorf("Failed to delete exec process %q for container %q: %v", execID, id, err) glog.Errorf("Failed to delete exec process %q for container %q: %v", execID, id, err)
} }
@ -131,50 +140,40 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
} }
}) })
// Get containerd event client first, so that we won't miss any events. resCh := make(chan execResult, 1)
// TODO(random-liu): Add filter to only subscribe events of the exec process. go func() {
// TODO(random-liu): Use `Wait` after is fixed. (containerd#1279, containerd#1287) // Wait will return if context is cancelled.
cancellable, cancel := context.WithCancel(ctx) exitCode, err := process.Wait(ctx)
eventstream, err := c.eventService.Subscribe(cancellable, &events.SubscribeRequest{}) resCh <- execResult{
if err != nil { exitCode: exitCode,
return nil, fmt.Errorf("failed to subscribe event stream: %v", err) err: err,
} }
defer cancel() glog.V(2).Infof("Exec process %q exits with exit code %d and error %v", execID, exitCode, err)
}()
if err := process.Start(ctx); err != nil { if err := process.Start(ctx); err != nil {
return nil, fmt.Errorf("failed to start exec %q: %v", execID, err) return nil, fmt.Errorf("failed to start exec %q: %v", execID, err)
} }
exitCode, err := c.waitContainerExec(eventstream, id, execID) var timeoutCh <-chan time.Time
if err != nil { if opts.timeout == 0 {
return nil, fmt.Errorf("failed to wait for exec in container %q to finish: %v", id, err) // Do not set timeout if it's 0.
timeoutCh = make(chan time.Time)
} else {
timeoutCh = time.After(opts.timeout)
} }
select {
// Wait for the io to be drained. case <-timeoutCh:
process.IO().Wait() // Ignore the not found error because the process may exit itself before killing.
if err := process.Kill(ctx, unix.SIGKILL); err != nil && !errdefs.IsNotFound(err) {
return exitCode, nil return nil, fmt.Errorf("failed to kill exec %q: %v", execID, err)
}
// waitContainerExec waits for container exec to finish and returns the exit code.
func (c *criContainerdService) waitContainerExec(eventstream events.Events_SubscribeClient, id string,
execID string) (*uint32, error) {
for {
evt, err := eventstream.Recv()
if err != nil {
return nil, err
} }
// Continue until the event received is of type task exit. // Wait for the process to be killed.
if !typeurl.Is(evt.Event, &events.TaskExit{}) { <-resCh
continue return nil, fmt.Errorf("timeout %v exceeded", opts.timeout)
} case res := <-resCh:
any, err := typeurl.UnmarshalAny(evt.Event) if res.err != nil {
if err != nil { return nil, fmt.Errorf("failed to wait for exec %q: %v", execID, res.err)
return nil, err
}
e := any.(*events.TaskExit)
if e.ContainerID == id && e.ID == execID {
return &e.ExitStatus, nil
} }
return &res.exitCode, nil
} }
} }

View File

@ -22,15 +22,10 @@ import (
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/api/services/tasks/v1" "github.com/containerd/containerd/api/services/tasks/v1"
versionapi "github.com/containerd/containerd/api/services/version/v1"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content" "github.com/containerd/containerd/content"
"github.com/containerd/containerd/images" "github.com/containerd/containerd/images"
diffservice "github.com/containerd/containerd/services/diff"
"github.com/containerd/containerd/snapshot"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/kubernetes-incubator/cri-o/pkg/ocicni" "github.com/kubernetes-incubator/cri-o/pkg/ocicni"
healthapi "google.golang.org/grpc/health/grpc_health_v1"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/server/streaming" "k8s.io/kubernetes/pkg/kubelet/server/streaming"
@ -73,25 +68,15 @@ type criContainerdService struct {
containerNameIndex *registrar.Registrar containerNameIndex *registrar.Registrar
// imageStore stores all resources associated with images. // imageStore stores all resources associated with images.
imageStore *imagestore.Store imageStore *imagestore.Store
// containerService is containerd containers client.
containerService containers.Store
// taskService is containerd tasks client. // taskService is containerd tasks client.
taskService tasks.TasksClient taskService tasks.TasksClient
// contentStoreService is the containerd content service client. // contentStoreService is the containerd content service client.
contentStoreService content.Store contentStoreService content.Store
// snapshotService is the containerd snapshot service client.
snapshotService snapshot.Snapshotter
// diffService is the containerd diff service client.
diffService diffservice.DiffService
// imageStoreService is the containerd service to store and track // imageStoreService is the containerd service to store and track
// image metadata. // image metadata.
imageStoreService images.Store imageStoreService images.Store
// eventsService is the containerd task service client // eventsService is the containerd task service client
eventService events.EventsClient eventService events.EventsClient
// versionService is the containerd version service client.
versionService versionapi.VersionClient
// healthService is the healthcheck service of containerd grpc server.
healthService healthapi.HealthClient
// netPlugin is used to setup and teardown network when run/stop pod sandbox. // netPlugin is used to setup and teardown network when run/stop pod sandbox.
netPlugin ocicni.CNIPlugin netPlugin ocicni.CNIPlugin
// agentFactory is the factory to create agent used in the cri containerd service. // agentFactory is the factory to create agent used in the cri containerd service.
@ -122,18 +107,12 @@ func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, n
imageStore: imagestore.NewStore(), imageStore: imagestore.NewStore(),
sandboxNameIndex: registrar.NewRegistrar(), sandboxNameIndex: registrar.NewRegistrar(),
containerNameIndex: registrar.NewRegistrar(), containerNameIndex: registrar.NewRegistrar(),
containerService: client.ContainerService(),
taskService: client.TaskService(), taskService: client.TaskService(),
imageStoreService: client.ImageService(), imageStoreService: client.ImageService(),
eventService: client.EventService(), eventService: client.EventService(),
contentStoreService: client.ContentStore(), contentStoreService: client.ContentStore(),
// Use daemon default snapshotter. agentFactory: agents.NewAgentFactory(),
snapshotService: client.SnapshotService(""), client: client,
diffService: client.DiffService(),
versionService: client.VersionService(),
healthService: client.HealthService(),
agentFactory: agents.NewAgentFactory(),
client: client,
} }
netPlugin, err := ocicni.InitCNI(networkPluginConfDir, networkPluginBinDir) netPlugin, err := ocicni.InitCNI(networkPluginConfDir, networkPluginBinDir)