Add container Exec support.
Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
parent
8b56c91ec5
commit
54286313ce
@ -38,7 +38,14 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
glog.V(2).Infof("Run cri-containerd grpc server on socket %q", o.SocketPath)
|
glog.V(2).Infof("Run cri-containerd grpc server on socket %q", o.SocketPath)
|
||||||
service, err := server.NewCRIContainerdService(o.ContainerdEndpoint, o.RootDir, o.NetworkPluginBinDir, o.NetworkPluginConfDir)
|
service, err := server.NewCRIContainerdService(
|
||||||
|
o.ContainerdEndpoint,
|
||||||
|
o.RootDir,
|
||||||
|
o.NetworkPluginBinDir,
|
||||||
|
o.NetworkPluginConfDir,
|
||||||
|
o.StreamServerAddress,
|
||||||
|
o.StreamServerPort,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Exitf("Failed to create CRI containerd service %+v: %v", o, err)
|
glog.Exitf("Failed to create CRI containerd service %+v: %v", o, err)
|
||||||
}
|
}
|
||||||
|
@ -40,6 +40,10 @@ type CRIContainerdOptions struct {
|
|||||||
NetworkPluginBinDir string
|
NetworkPluginBinDir string
|
||||||
// NetworkPluginConfDir is the directory in which the admin places a CNI conf.
|
// NetworkPluginConfDir is the directory in which the admin places a CNI conf.
|
||||||
NetworkPluginConfDir string
|
NetworkPluginConfDir string
|
||||||
|
// StreamServerAddress is the ip address streaming server is listening on.
|
||||||
|
StreamServerAddress string
|
||||||
|
// StreamServerPort is the port streaming server is listening on.
|
||||||
|
StreamServerPort string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCRIContainerdOptions returns a reference to CRIContainerdOptions
|
// NewCRIContainerdOptions returns a reference to CRIContainerdOptions
|
||||||
@ -63,6 +67,10 @@ func (c *CRIContainerdOptions) AddFlags(fs *pflag.FlagSet) {
|
|||||||
"/etc/cni/net.d", "The directory for putting network binaries.")
|
"/etc/cni/net.d", "The directory for putting network binaries.")
|
||||||
fs.StringVar(&c.NetworkPluginConfDir, "network-conf-dir",
|
fs.StringVar(&c.NetworkPluginConfDir, "network-conf-dir",
|
||||||
"/opt/cni/bin", "The directory for putting network plugin configuration files.")
|
"/opt/cni/bin", "The directory for putting network plugin configuration files.")
|
||||||
|
fs.StringVar(&c.StreamServerAddress, "stream-addr",
|
||||||
|
"", "The ip address streaming server is listening on. Default host interface is used if this is empty.")
|
||||||
|
fs.StringVar(&c.StreamServerPort, "stream-port",
|
||||||
|
"10010", "The port streaming server is listening on.")
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitFlags must be called after adding all cli options flags are defined and
|
// InitFlags must be called after adding all cli options flags are defined and
|
||||||
|
@ -17,14 +17,30 @@ limitations under the License.
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
|
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
|
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
|
||||||
func (c *criContainerdService) Exec(ctx context.Context, r *runtime.ExecRequest) (*runtime.ExecResponse, error) {
|
func (c *criContainerdService) Exec(ctx context.Context, r *runtime.ExecRequest) (retRes *runtime.ExecResponse, retErr error) {
|
||||||
return nil, errors.New("not implemented")
|
glog.V(2).Infof("Exec for %q with command %+v, tty %v and stdin %v",
|
||||||
|
r.GetContainerId(), r.GetCmd(), r.GetTty(), r.GetStdin())
|
||||||
|
defer func() {
|
||||||
|
if retErr == nil {
|
||||||
|
glog.V(2).Infof("Exec for %q returns URL %q", r.GetContainerId(), retRes.Url)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
cntr, err := c.containerStore.Get(r.GetContainerId())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to find container in store: %v", err)
|
||||||
|
}
|
||||||
|
state := cntr.Status.Get().State()
|
||||||
|
if state != runtime.ContainerState_CONTAINER_RUNNING {
|
||||||
|
return nil, fmt.Errorf("container is in %s state", criContainerStateToString(state))
|
||||||
|
}
|
||||||
|
return c.streamServer.GetExec(r)
|
||||||
}
|
}
|
||||||
|
@ -43,10 +43,9 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var stdin, stdout, stderr bytes.Buffer
|
var stdout, stderr bytes.Buffer
|
||||||
exitCode, err := c.execInContainer(ctx, r.GetContainerId(), execOptions{
|
exitCode, err := c.execInContainer(ctx, r.GetContainerId(), execOptions{
|
||||||
cmd: r.GetCmd(),
|
cmd: r.GetCmd(),
|
||||||
stdin: &stdin,
|
|
||||||
stdout: &stdout,
|
stdout: &stdout,
|
||||||
stderr: &stderr,
|
stderr: &stderr,
|
||||||
timeout: time.Duration(r.GetTimeout()) * time.Second,
|
timeout: time.Duration(r.GetTimeout()) * time.Second,
|
||||||
@ -106,6 +105,10 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
|
|||||||
pspec.Args = opts.cmd
|
pspec.Args = opts.cmd
|
||||||
pspec.Terminal = opts.tty
|
pspec.Terminal = opts.tty
|
||||||
|
|
||||||
|
if opts.stdin == nil {
|
||||||
|
// Create empty buffer if stdin is nil.
|
||||||
|
opts.stdin = new(bytes.Buffer)
|
||||||
|
}
|
||||||
execID := generateID()
|
execID := generateID()
|
||||||
process, err := task.Exec(ctx, execID, pspec, containerd.NewIOWithTerminal(
|
process, err := task.Exec(ctx, execID, pspec, containerd.NewIOWithTerminal(
|
||||||
opts.stdin,
|
opts.stdin,
|
||||||
|
@ -28,9 +28,11 @@ import (
|
|||||||
"github.com/containerd/containerd/images"
|
"github.com/containerd/containerd/images"
|
||||||
diffservice "github.com/containerd/containerd/services/diff"
|
diffservice "github.com/containerd/containerd/services/diff"
|
||||||
"github.com/containerd/containerd/snapshot"
|
"github.com/containerd/containerd/snapshot"
|
||||||
|
"github.com/golang/glog"
|
||||||
"github.com/kubernetes-incubator/cri-o/pkg/ocicni"
|
"github.com/kubernetes-incubator/cri-o/pkg/ocicni"
|
||||||
healthapi "google.golang.org/grpc/health/grpc_health_v1"
|
healthapi "google.golang.org/grpc/health/grpc_health_v1"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
|
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
|
||||||
|
|
||||||
osinterface "github.com/kubernetes-incubator/cri-containerd/pkg/os"
|
osinterface "github.com/kubernetes-incubator/cri-containerd/pkg/os"
|
||||||
"github.com/kubernetes-incubator/cri-containerd/pkg/registrar"
|
"github.com/kubernetes-incubator/cri-containerd/pkg/registrar"
|
||||||
@ -84,6 +86,8 @@ type criContainerdService struct {
|
|||||||
// imageStoreService is the containerd service to store and track
|
// imageStoreService is the containerd service to store and track
|
||||||
// image metadata.
|
// image metadata.
|
||||||
imageStoreService images.Store
|
imageStoreService images.Store
|
||||||
|
// eventsService is the containerd task service client
|
||||||
|
eventService events.EventsClient
|
||||||
// versionService is the containerd version service client.
|
// versionService is the containerd version service client.
|
||||||
versionService versionapi.VersionClient
|
versionService versionapi.VersionClient
|
||||||
// healthService is the healthcheck service of containerd grpc server.
|
// healthService is the healthcheck service of containerd grpc server.
|
||||||
@ -94,12 +98,14 @@ type criContainerdService struct {
|
|||||||
agentFactory agents.AgentFactory
|
agentFactory agents.AgentFactory
|
||||||
// client is an instance of the containerd client
|
// client is an instance of the containerd client
|
||||||
client *containerd.Client
|
client *containerd.Client
|
||||||
// eventsService is the containerd task service client
|
// streamServer is the streaming server serves container streaming request.
|
||||||
eventService events.EventsClient
|
streamServer streaming.Server
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCRIContainerdService returns a new instance of CRIContainerdService
|
// NewCRIContainerdService returns a new instance of CRIContainerdService
|
||||||
func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, networkPluginConfDir string) (CRIContainerdService, error) {
|
// TODO(random-liu): Add cri-containerd server config to get rid of the long arg list.
|
||||||
|
func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, networkPluginConfDir,
|
||||||
|
streamAddress, streamPort string) (CRIContainerdService, error) {
|
||||||
// TODO(random-liu): [P2] Recover from runtime state and checkpoint.
|
// TODO(random-liu): [P2] Recover from runtime state and checkpoint.
|
||||||
|
|
||||||
client, err := containerd.New(containerdEndpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace))
|
client, err := containerd.New(containerdEndpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace))
|
||||||
@ -119,6 +125,7 @@ func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, n
|
|||||||
containerService: client.ContainerService(),
|
containerService: client.ContainerService(),
|
||||||
taskService: client.TaskService(),
|
taskService: client.TaskService(),
|
||||||
imageStoreService: client.ImageService(),
|
imageStoreService: client.ImageService(),
|
||||||
|
eventService: client.EventService(),
|
||||||
contentStoreService: client.ContentStore(),
|
contentStoreService: client.ContentStore(),
|
||||||
// Use daemon default snapshotter.
|
// Use daemon default snapshotter.
|
||||||
snapshotService: client.SnapshotService(""),
|
snapshotService: client.SnapshotService(""),
|
||||||
@ -127,7 +134,6 @@ func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, n
|
|||||||
healthService: client.HealthService(),
|
healthService: client.HealthService(),
|
||||||
agentFactory: agents.NewAgentFactory(),
|
agentFactory: agents.NewAgentFactory(),
|
||||||
client: client,
|
client: client,
|
||||||
eventService: client.EventService(),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
netPlugin, err := ocicni.InitCNI(networkPluginBinDir, networkPluginConfDir)
|
netPlugin, err := ocicni.InitCNI(networkPluginBinDir, networkPluginConfDir)
|
||||||
@ -136,9 +142,20 @@ func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, n
|
|||||||
}
|
}
|
||||||
c.netPlugin = netPlugin
|
c.netPlugin = netPlugin
|
||||||
|
|
||||||
|
// prepare streaming server
|
||||||
|
c.streamServer, err = newStreamServer(c, streamAddress, streamPort)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create stream server: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *criContainerdService) Start() {
|
func (c *criContainerdService) Start() {
|
||||||
c.startEventMonitor()
|
c.startEventMonitor()
|
||||||
|
go func() {
|
||||||
|
if err := c.streamServer.Start(true); err != nil {
|
||||||
|
glog.Errorf("Failed to start streaming server: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
@ -17,11 +17,75 @@ limitations under the License.
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
k8snet "k8s.io/apimachinery/pkg/util/net"
|
||||||
"k8s.io/apimachinery/pkg/util/runtime"
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/client-go/tools/remotecommand"
|
"k8s.io/client-go/tools/remotecommand"
|
||||||
_ "k8s.io/kubernetes/pkg/kubelet/server/streaming"
|
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
|
||||||
|
// TODO(random-liu): k8s.io/utils/exec after bump up kubernetes version.
|
||||||
|
"k8s.io/kubernetes/pkg/util/exec"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func newStreamServer(c *criContainerdService, addr, port string) (streaming.Server, error) {
|
||||||
|
if addr == "" {
|
||||||
|
a, err := k8snet.ChooseBindAddress(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get stream server address: %v", err)
|
||||||
|
}
|
||||||
|
addr = a.String()
|
||||||
|
}
|
||||||
|
config := streaming.DefaultConfig
|
||||||
|
config.Addr = net.JoinHostPort(addr, port)
|
||||||
|
runtime := newStreamRuntime(c)
|
||||||
|
return streaming.NewServer(config, runtime)
|
||||||
|
}
|
||||||
|
|
||||||
|
type streamRuntime struct {
|
||||||
|
c *criContainerdService
|
||||||
|
}
|
||||||
|
|
||||||
|
func newStreamRuntime(c *criContainerdService) streaming.Runtime {
|
||||||
|
return &streamRuntime{c: c}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exec executes a command inside the container. exec.ExitError is returned if the command
|
||||||
|
// returns non-zero exit code.
|
||||||
|
func (s *streamRuntime) Exec(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser,
|
||||||
|
tty bool, resize <-chan remotecommand.TerminalSize) error {
|
||||||
|
exitCode, err := s.c.execInContainer(context.Background(), containerID, execOptions{
|
||||||
|
cmd: cmd,
|
||||||
|
stdin: stdin,
|
||||||
|
stdout: stdout,
|
||||||
|
stderr: stderr,
|
||||||
|
tty: tty,
|
||||||
|
resize: resize,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to exec in container: %v", err)
|
||||||
|
}
|
||||||
|
if *exitCode == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &exec.CodeExitError{
|
||||||
|
Err: fmt.Errorf("error executing command %v, exit code %d", cmd, *exitCode),
|
||||||
|
Code: int(*exitCode),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *streamRuntime) Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool,
|
||||||
|
resize <-chan remotecommand.TerminalSize) error {
|
||||||
|
return errors.New("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *streamRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
|
||||||
|
return errors.New("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
// handleResizing spawns a goroutine that processes the resize channel, calling resizeFunc for each
|
// handleResizing spawns a goroutine that processes the resize channel, calling resizeFunc for each
|
||||||
// remotecommand.TerminalSize received from the channel. The resize channel must be closed elsewhere to stop the
|
// remotecommand.TerminalSize received from the channel. The resize channel must be closed elsewhere to stop the
|
||||||
// goroutine.
|
// goroutine.
|
||||||
|
Loading…
Reference in New Issue
Block a user