
Automatic merge from submit-queue (batch tested with PRs 45809, 46515, 46484, 46516, 45614) CRI: add methods for container stats **What this PR does / why we need it**: Define methods in CRI to get container stats. **Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: Part of https://github.com/kubernetes/features/issues/290; addresses #27097 **Special notes for your reviewer**: This PR defines the *minimum required* container metrics for the existing components to function, loosely based on the previous discussion on [core metrics](https://github.com/kubernetes/community/blob/master/contributors/design-proposals/core-metrics-pipeline.md) as well as the existing cadvisor/summary APIs. Two new RPC calls are added to the RuntimeService: `ContainerStats` and `ListContainerStats`. The former retrieves stats for a given container, while the latter gets stats for all containers in one call. The stats gathering time of each subsystem can vary substantially (e.g., cpu vs. disk), so even though the on-demand model preferred due to its simplicity, we’d rather give the container runtime more flexibility to determine the collection frequency for each subsystem*. As a trade-off, each piece of stats for the subsystem must contain a timestamp to let kubelet know how fresh/recent the stats are. In the future, we should also recommend a guideline for how recent the stats should be in order to ensure the reliability (e.g., eviction) and the responsiveness (e.g., autoscaling) of the kubernetes cluster. The next step is to plumb this through kubelet so that kubelet can choose consume container stats from CRI or cadvisor. **Alternatively, we can add calls to get stats of individual subsystems. However, kubelet does not have the complete knowledge of the runtime environment, so this would only lead to unnecessary complexity in kubelet.* **Release note**: ```release-note Augment CRI to support retrieving container stats from the runtime. ```
431 lines
14 KiB
Go
431 lines
14 KiB
Go
/*
|
|
Copyright 2016 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package remote
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/golang/glog"
|
|
"google.golang.org/grpc"
|
|
|
|
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
|
|
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
|
|
"k8s.io/kubernetes/pkg/kubelet/util"
|
|
utilexec "k8s.io/kubernetes/pkg/util/exec"
|
|
)
|
|
|
|
// RemoteRuntimeService is a gRPC implementation of internalapi.RuntimeService.
|
|
type RemoteRuntimeService struct {
|
|
timeout time.Duration
|
|
runtimeClient runtimeapi.RuntimeServiceClient
|
|
}
|
|
|
|
// NewRemoteRuntimeService creates a new internalapi.RuntimeService.
|
|
func NewRemoteRuntimeService(endpoint string, connectionTimout time.Duration) (internalapi.RuntimeService, error) {
|
|
glog.Infof("Connecting to runtime service %s", endpoint)
|
|
addr, dailer, err := util.GetAddressAndDialer(endpoint)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(connectionTimout), grpc.WithDialer(dailer))
|
|
if err != nil {
|
|
glog.Errorf("Connect remote runtime %s failed: %v", addr, err)
|
|
return nil, err
|
|
}
|
|
|
|
return &RemoteRuntimeService{
|
|
timeout: connectionTimout,
|
|
runtimeClient: runtimeapi.NewRuntimeServiceClient(conn),
|
|
}, nil
|
|
}
|
|
|
|
// Version returns the runtime name, runtime version and runtime API version.
|
|
func (r *RemoteRuntimeService) Version(apiVersion string) (*runtimeapi.VersionResponse, error) {
|
|
ctx, cancel := getContextWithTimeout(r.timeout)
|
|
defer cancel()
|
|
|
|
typedVersion, err := r.runtimeClient.Version(ctx, &runtimeapi.VersionRequest{
|
|
Version: apiVersion,
|
|
})
|
|
if err != nil {
|
|
glog.Errorf("Version from runtime service failed: %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
if typedVersion.Version == "" || typedVersion.RuntimeName == "" || typedVersion.RuntimeApiVersion == "" || typedVersion.RuntimeVersion == "" {
|
|
return nil, fmt.Errorf("not all fields are set in VersionResponse (%q)", *typedVersion)
|
|
}
|
|
|
|
return typedVersion, err
|
|
}
|
|
|
|
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
|
|
// the sandbox is in ready state.
|
|
func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (string, error) {
|
|
// Use 2 times longer timeout for sandbox operation (4 mins by default)
|
|
// TODO: Make the pod sandbox timeout configurable.
|
|
ctx, cancel := getContextWithTimeout(r.timeout * 2)
|
|
defer cancel()
|
|
|
|
resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
|
|
Config: config,
|
|
})
|
|
if err != nil {
|
|
glog.Errorf("RunPodSandbox from runtime service failed: %v", err)
|
|
return "", err
|
|
}
|
|
|
|
if resp.PodSandboxId == "" {
|
|
errorMessage := fmt.Sprintf("PodSandboxId is not set for sandbox %q", config.GetMetadata())
|
|
glog.Errorf("RunPodSandbox failed: %s", errorMessage)
|
|
return "", errors.New(errorMessage)
|
|
}
|
|
|
|
return resp.PodSandboxId, nil
|
|
}
|
|
|
|
// StopPodSandbox stops the sandbox. If there are any running containers in the
|
|
// sandbox, they should be forced to termination.
|
|
func (r *RemoteRuntimeService) StopPodSandbox(podSandBoxID string) error {
|
|
ctx, cancel := getContextWithTimeout(r.timeout)
|
|
defer cancel()
|
|
|
|
_, err := r.runtimeClient.StopPodSandbox(ctx, &runtimeapi.StopPodSandboxRequest{
|
|
PodSandboxId: podSandBoxID,
|
|
})
|
|
if err != nil {
|
|
glog.Errorf("StopPodSandbox %q from runtime service failed: %v", podSandBoxID, err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RemovePodSandbox removes the sandbox. If there are any containers in the
|
|
// sandbox, they should be forcibly removed.
|
|
func (r *RemoteRuntimeService) RemovePodSandbox(podSandBoxID string) error {
|
|
ctx, cancel := getContextWithTimeout(r.timeout)
|
|
defer cancel()
|
|
|
|
_, err := r.runtimeClient.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{
|
|
PodSandboxId: podSandBoxID,
|
|
})
|
|
if err != nil {
|
|
glog.Errorf("RemovePodSandbox %q from runtime service failed: %v", podSandBoxID, err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// PodSandboxStatus returns the status of the PodSandbox.
|
|
func (r *RemoteRuntimeService) PodSandboxStatus(podSandBoxID string) (*runtimeapi.PodSandboxStatus, error) {
|
|
ctx, cancel := getContextWithTimeout(r.timeout)
|
|
defer cancel()
|
|
|
|
resp, err := r.runtimeClient.PodSandboxStatus(ctx, &runtimeapi.PodSandboxStatusRequest{
|
|
PodSandboxId: podSandBoxID,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if resp.Status != nil {
|
|
if err := verifySandboxStatus(resp.Status); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return resp.Status, nil
|
|
}
|
|
|
|
// ListPodSandbox returns a list of PodSandboxes.
|
|
func (r *RemoteRuntimeService) ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error) {
|
|
ctx, cancel := getContextWithTimeout(r.timeout)
|
|
defer cancel()
|
|
|
|
resp, err := r.runtimeClient.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{
|
|
Filter: filter,
|
|
})
|
|
if err != nil {
|
|
glog.Errorf("ListPodSandbox with filter %q from runtime service failed: %v", filter, err)
|
|
return nil, err
|
|
}
|
|
|
|
return resp.Items, nil
|
|
}
|
|
|
|
// CreateContainer creates a new container in the specified PodSandbox.
|
|
func (r *RemoteRuntimeService) CreateContainer(podSandBoxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
|
|
ctx, cancel := getContextWithTimeout(r.timeout)
|
|
defer cancel()
|
|
|
|
resp, err := r.runtimeClient.CreateContainer(ctx, &runtimeapi.CreateContainerRequest{
|
|
PodSandboxId: podSandBoxID,
|
|
Config: config,
|
|
SandboxConfig: sandboxConfig,
|
|
})
|
|
if err != nil {
|
|
glog.Errorf("CreateContainer in sandbox %q from runtime service failed: %v", podSandBoxID, err)
|
|
return "", err
|
|
}
|
|
|
|
if resp.ContainerId == "" {
|
|
errorMessage := fmt.Sprintf("ContainerId is not set for container %q", config.GetMetadata())
|
|
glog.Errorf("CreateContainer failed: %s", errorMessage)
|
|
return "", errors.New(errorMessage)
|
|
}
|
|
|
|
return resp.ContainerId, nil
|
|
}
|
|
|
|
// StartContainer starts the container.
|
|
func (r *RemoteRuntimeService) StartContainer(containerID string) error {
|
|
ctx, cancel := getContextWithTimeout(r.timeout)
|
|
defer cancel()
|
|
|
|
_, err := r.runtimeClient.StartContainer(ctx, &runtimeapi.StartContainerRequest{
|
|
ContainerId: containerID,
|
|
})
|
|
if err != nil {
|
|
glog.Errorf("StartContainer %q from runtime service failed: %v", containerID, err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// StopContainer stops a running container with a grace period (i.e., timeout).
|
|
func (r *RemoteRuntimeService) StopContainer(containerID string, timeout int64) error {
|
|
// Use timeout + default timeout (2 minutes) as timeout to leave extra time
|
|
// for SIGKILL container and request latency.
|
|
t := r.timeout + time.Duration(timeout)*time.Second
|
|
ctx, cancel := getContextWithTimeout(t)
|
|
defer cancel()
|
|
|
|
_, err := r.runtimeClient.StopContainer(ctx, &runtimeapi.StopContainerRequest{
|
|
ContainerId: containerID,
|
|
Timeout: timeout,
|
|
})
|
|
if err != nil {
|
|
glog.Errorf("StopContainer %q from runtime service failed: %v", containerID, err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RemoveContainer removes the container. If the container is running, the container
|
|
// should be forced to removal.
|
|
func (r *RemoteRuntimeService) RemoveContainer(containerID string) error {
|
|
ctx, cancel := getContextWithTimeout(r.timeout)
|
|
defer cancel()
|
|
|
|
_, err := r.runtimeClient.RemoveContainer(ctx, &runtimeapi.RemoveContainerRequest{
|
|
ContainerId: containerID,
|
|
})
|
|
if err != nil {
|
|
glog.Errorf("RemoveContainer %q from runtime service failed: %v", containerID, err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ListContainers lists containers by filters.
|
|
func (r *RemoteRuntimeService) ListContainers(filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) {
|
|
ctx, cancel := getContextWithTimeout(r.timeout)
|
|
defer cancel()
|
|
|
|
resp, err := r.runtimeClient.ListContainers(ctx, &runtimeapi.ListContainersRequest{
|
|
Filter: filter,
|
|
})
|
|
if err != nil {
|
|
glog.Errorf("ListContainers with filter %q from runtime service failed: %v", filter, err)
|
|
return nil, err
|
|
}
|
|
|
|
return resp.Containers, nil
|
|
}
|
|
|
|
// ContainerStatus returns the container status.
|
|
func (r *RemoteRuntimeService) ContainerStatus(containerID string) (*runtimeapi.ContainerStatus, error) {
|
|
ctx, cancel := getContextWithTimeout(r.timeout)
|
|
defer cancel()
|
|
|
|
resp, err := r.runtimeClient.ContainerStatus(ctx, &runtimeapi.ContainerStatusRequest{
|
|
ContainerId: containerID,
|
|
})
|
|
if err != nil {
|
|
glog.Errorf("ContainerStatus %q from runtime service failed: %v", containerID, err)
|
|
return nil, err
|
|
}
|
|
|
|
if resp.Status != nil {
|
|
if err := verifyContainerStatus(resp.Status); err != nil {
|
|
glog.Errorf("ContainerStatus of %q failed: %v", containerID, err)
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return resp.Status, nil
|
|
}
|
|
|
|
// ExecSync executes a command in the container, and returns the stdout output.
|
|
// If command exits with a non-zero exit code, an error is returned.
|
|
func (r *RemoteRuntimeService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
|
|
ctx, cancel := getContextWithTimeout(timeout)
|
|
if timeout == 0 {
|
|
// Do not set timeout when timeout is 0.
|
|
ctx, cancel = getContextWithCancel()
|
|
}
|
|
defer cancel()
|
|
|
|
timeoutSeconds := int64(timeout.Seconds())
|
|
req := &runtimeapi.ExecSyncRequest{
|
|
ContainerId: containerID,
|
|
Cmd: cmd,
|
|
Timeout: timeoutSeconds,
|
|
}
|
|
resp, err := r.runtimeClient.ExecSync(ctx, req)
|
|
if err != nil {
|
|
glog.Errorf("ExecSync %s '%s' from runtime service failed: %v", containerID, strings.Join(cmd, " "), err)
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = nil
|
|
if resp.ExitCode != 0 {
|
|
err = utilexec.CodeExitError{
|
|
Err: fmt.Errorf("command '%s' exited with %d: %s", strings.Join(cmd, " "), resp.ExitCode, resp.Stderr),
|
|
Code: int(resp.ExitCode),
|
|
}
|
|
}
|
|
|
|
return resp.Stdout, resp.Stderr, err
|
|
}
|
|
|
|
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
|
|
func (r *RemoteRuntimeService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
|
|
ctx, cancel := getContextWithTimeout(r.timeout)
|
|
defer cancel()
|
|
|
|
resp, err := r.runtimeClient.Exec(ctx, req)
|
|
if err != nil {
|
|
glog.Errorf("Exec %s '%s' from runtime service failed: %v", req.ContainerId, strings.Join(req.Cmd, " "), err)
|
|
return nil, err
|
|
}
|
|
|
|
if resp.Url == "" {
|
|
errorMessage := "URL is not set"
|
|
glog.Errorf("Exec failed: %s", errorMessage)
|
|
return nil, errors.New(errorMessage)
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
|
|
func (r *RemoteRuntimeService) Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
|
|
ctx, cancel := getContextWithTimeout(r.timeout)
|
|
defer cancel()
|
|
|
|
resp, err := r.runtimeClient.Attach(ctx, req)
|
|
if err != nil {
|
|
glog.Errorf("Attach %s from runtime service failed: %v", req.ContainerId, err)
|
|
return nil, err
|
|
}
|
|
|
|
if resp.Url == "" {
|
|
errorMessage := "URL is not set"
|
|
glog.Errorf("Exec failed: %s", errorMessage)
|
|
return nil, errors.New(errorMessage)
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
|
|
func (r *RemoteRuntimeService) PortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
|
|
ctx, cancel := getContextWithTimeout(r.timeout)
|
|
defer cancel()
|
|
|
|
resp, err := r.runtimeClient.PortForward(ctx, req)
|
|
if err != nil {
|
|
glog.Errorf("PortForward %s from runtime service failed: %v", req.PodSandboxId, err)
|
|
return nil, err
|
|
}
|
|
|
|
if resp.Url == "" {
|
|
errorMessage := "URL is not set"
|
|
glog.Errorf("Exec failed: %s", errorMessage)
|
|
return nil, errors.New(errorMessage)
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// UpdateRuntimeConfig updates the config of a runtime service. The only
|
|
// update payload currently supported is the pod CIDR assigned to a node,
|
|
// and the runtime service just proxies it down to the network plugin.
|
|
func (r *RemoteRuntimeService) UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeConfig) error {
|
|
ctx, cancel := getContextWithTimeout(r.timeout)
|
|
defer cancel()
|
|
|
|
// Response doesn't contain anything of interest. This translates to an
|
|
// Event notification to the network plugin, which can't fail, so we're
|
|
// really looking to surface destination unreachable.
|
|
_, err := r.runtimeClient.UpdateRuntimeConfig(ctx, &runtimeapi.UpdateRuntimeConfigRequest{
|
|
RuntimeConfig: runtimeConfig,
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Status returns the status of the runtime.
|
|
func (r *RemoteRuntimeService) Status() (*runtimeapi.RuntimeStatus, error) {
|
|
ctx, cancel := getContextWithTimeout(r.timeout)
|
|
defer cancel()
|
|
|
|
resp, err := r.runtimeClient.Status(ctx, &runtimeapi.StatusRequest{})
|
|
if err != nil {
|
|
glog.Errorf("Status from runtime service failed: %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
if resp.Status == nil || len(resp.Status.Conditions) < 2 {
|
|
errorMessage := "RuntimeReady or NetworkReady condition are not set"
|
|
glog.Errorf("Status failed: %s", errorMessage)
|
|
return nil, errors.New(errorMessage)
|
|
}
|
|
|
|
return resp.Status, nil
|
|
}
|
|
|
|
func (r *RemoteRuntimeService) ContainerStats(req *runtimeapi.ContainerStatsRequest) (*runtimeapi.ContainerStatsResponse, error) {
|
|
return nil, fmt.Errorf("Not implemented")
|
|
}
|
|
|
|
func (r *RemoteRuntimeService) ListContainerStats(req *runtimeapi.ListContainerStatsRequest) (*runtimeapi.ListContainerStatsResponse, error) {
|
|
return nil, fmt.Errorf("Not implemented")
|
|
}
|