Update runc to v1.0.0-rc91

https://github.com/opencontainers/runc/releases/tag/v1.0.0-rc91

Signed-off-by: Davanum Srinivas <davanum@gmail.com>
This commit is contained in:
Davanum Srinivas
2020-07-01 22:06:59 -04:00
parent c91c72c867
commit 963625d7bc
275 changed files with 9060 additions and 18508 deletions

View File

@@ -78,7 +78,7 @@ specifications as appropriate.
backport version of `libseccomp-dev` is required. See [travis.yml](.travis.yml) for an example on trusty.
* **btrfs development library.** Required by containerd btrfs support. `btrfs-tools`(Ubuntu, Debian) / `btrfs-progs-devel`(Fedora, CentOS, RHEL)
2. Install **`pkg-config`** (required for linking with `libseccomp`).
3. Install and setup a Go 1.13.11 development environment.
3. Install and setup a Go 1.13.12 development environment.
4. Make a local clone of this repository.
5. Install binary dependencies by running the following command from your cloned `cri/` project directory:
```bash

View File

@@ -37,7 +37,7 @@ import (
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/klog"
"k8s.io/klog/v2"
criconfig "github.com/containerd/cri/pkg/config"
"github.com/containerd/cri/pkg/constants"

View File

@@ -232,10 +232,10 @@ type PluginConfig struct {
// UnsetSeccompProfile is the profile containerd/cri will use If the provided seccomp profile is
// unset (`""`) for a container (default is `unconfined`)
UnsetSeccompProfile string `toml:"unset_seccomp_profile" json:"unsetSeccompProfile"`
// TolerateMissingHugePagesCgroupController if set to false will error out on create/update
// TolerateMissingHugetlbController if set to false will error out on create/update
// container requests with huge page limits if the cgroup controller for hugepages is not present.
// This helps with supporting Kubernetes <=1.18 out of the box. (default is `true`)
TolerateMissingHugePagesCgroupController bool `toml:"tolerate_missing_hugepages_controller" json:"tolerateMissingHugePagesCgroupController"`
TolerateMissingHugetlbController bool `toml:"tolerate_missing_hugetlb_controller" json:"tolerateMissingHugetlbController"`
// IgnoreImageDefinedVolumes ignores volumes defined by the image. Useful for better resource
// isolation, security and early detection of issues in the mount configuration when using
// ReadOnlyRootFilesystem since containers won't silently mount a temporary volume.

View File

@@ -20,7 +20,7 @@ package config
import (
"github.com/containerd/containerd"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"github.com/containerd/cri/pkg/streaming"
)
// DefaultConfig returns default configurations of cri plugin.
@@ -63,9 +63,9 @@ func DefaultConfig() PluginConfig {
},
},
},
MaxConcurrentDownloads: 3,
DisableProcMount: false,
TolerateMissingHugePagesCgroupController: true,
IgnoreImageDefinedVolumes: false,
MaxConcurrentDownloads: 3,
DisableProcMount: false,
TolerateMissingHugetlbController: true,
IgnoreImageDefinedVolumes: false,
}
}

View File

@@ -23,7 +23,7 @@ import (
"path/filepath"
"github.com/containerd/containerd"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"github.com/containerd/cri/pkg/streaming"
)
// DefaultConfig returns default configurations of cri plugin.

View File

@@ -324,7 +324,7 @@ func WithDevices(osi osinterface.OS, config *runtime.ContainerConfig) oci.SpecOp
Type: string(dev.Type),
Major: &dev.Major,
Minor: &dev.Minor,
Access: dev.Permissions,
Access: string(dev.Permissions),
})
}
return nil
@@ -408,7 +408,7 @@ func WithSelinuxLabels(process, mount string) oci.SpecOpts {
}
// WithResources sets the provided resource restrictions
func WithResources(resources *runtime.LinuxContainerResources, tolerateMissingHugePagesCgroupController bool) oci.SpecOpts {
func WithResources(resources *runtime.LinuxContainerResources, tolerateMissingHugetlbController bool) oci.SpecOpts {
return func(ctx context.Context, client oci.Client, c *containers.Container, s *runtimespec.Spec) (err error) {
if resources == nil {
return nil
@@ -451,7 +451,7 @@ func WithResources(resources *runtime.LinuxContainerResources, tolerateMissingHu
if limit != 0 {
s.Linux.Resources.Memory.Limit = &limit
}
if isHugePagesControllerPresent() {
if isHugetlbControllerPresent() {
for _, limit := range hugepages {
s.Linux.Resources.HugepageLimits = append(s.Linux.Resources.HugepageLimits, runtimespec.LinuxHugepageLimit{
Pagesize: limit.PageSize,
@@ -459,9 +459,9 @@ func WithResources(resources *runtime.LinuxContainerResources, tolerateMissingHu
})
}
} else {
if !tolerateMissingHugePagesCgroupController {
if !tolerateMissingHugetlbController {
return errors.Errorf("huge pages limits are specified but hugetlb cgroup controller is missing. " +
"Please set tolerate_missing_hugepages_controller to `true` to ignore this error")
"Please set tolerate_missing_hugetlb_controller to `true` to ignore this error")
}
logrus.Warn("hugetlb cgroup controller is absent. skipping huge pages limits")
}
@@ -474,7 +474,7 @@ var (
supportsHugetlb bool
)
func isHugePagesControllerPresent() bool {
func isHugetlbControllerPresent() bool {
supportsHugetlbOnce.Do(func() {
supportsHugetlb = false
if IsCgroup2UnifiedMode() {

View File

@@ -22,6 +22,7 @@ import (
"context"
"path/filepath"
"sort"
"strings"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/oci"
@@ -47,6 +48,20 @@ func WithWindowsNetworkNamespace(path string) oci.SpecOpts {
}
}
// namedPipePath returns true if the given path is to a named pipe.
func namedPipePath(p string) bool {
return strings.HasPrefix(p, `\\.\pipe\`)
}
// cleanMount returns a cleaned version of the mount path. The input is returned
// as-is if it is a named pipe path.
func cleanMount(p string) string {
if namedPipePath(p) {
return p
}
return filepath.Clean(p)
}
// WithWindowsMounts sorts and adds runtime and CRI mounts to the spec for
// windows container.
func WithWindowsMounts(osi osinterface.OS, config *runtime.ContainerConfig, extra []*runtime.Mount) oci.SpecOpts {
@@ -62,7 +77,7 @@ func WithWindowsMounts(osi osinterface.OS, config *runtime.ContainerConfig, extr
for _, e := range extra {
found := false
for _, c := range criMounts {
if filepath.Clean(e.ContainerPath) == filepath.Clean(c.ContainerPath) {
if cleanMount(e.ContainerPath) == cleanMount(c.ContainerPath) {
found = true
break
}
@@ -80,14 +95,14 @@ func WithWindowsMounts(osi osinterface.OS, config *runtime.ContainerConfig, extr
// mounts overridden by supplied mount;
mountSet := make(map[string]struct{})
for _, m := range mounts {
mountSet[filepath.Clean(m.ContainerPath)] = struct{}{}
mountSet[cleanMount(m.ContainerPath)] = struct{}{}
}
defaultMounts := s.Mounts
s.Mounts = nil
for _, m := range defaultMounts {
dst := filepath.Clean(m.Destination)
dst := cleanMount(m.Destination)
if _, ok := mountSet[dst]; ok {
// filter out mount overridden by a supplied mount
continue
@@ -100,17 +115,25 @@ func WithWindowsMounts(osi osinterface.OS, config *runtime.ContainerConfig, extr
dst = mount.GetContainerPath()
src = mount.GetHostPath()
)
// TODO(windows): Support special mount sources, e.g. named pipe.
// Create the host path if it doesn't exist.
if _, err := osi.Stat(src); err != nil {
// If the source doesn't exist, return an error instead
// of creating the source. This aligns with Docker's
// behavior on windows.
return errors.Wrapf(err, "failed to stat %q", src)
}
src, err := osi.ResolveSymbolicLink(src)
if err != nil {
return errors.Wrapf(err, "failed to resolve symlink %q", src)
// In the case of a named pipe mount on Windows, don't stat the file
// or do other operations that open it, as that could interfere with
// the listening process. filepath.Clean also breaks named pipe
// paths, so don't use it.
if !namedPipePath(src) {
if _, err := osi.Stat(src); err != nil {
// If the source doesn't exist, return an error instead
// of creating the source. This aligns with Docker's
// behavior on windows.
return errors.Wrapf(err, "failed to stat %q", src)
}
var err error
src, err = osi.ResolveSymbolicLink(src)
if err != nil {
return errors.Wrapf(err, "failed to resolve symlink %q", src)
}
// hcsshim requires clean path, especially '/' -> '\'.
src = filepath.Clean(src)
dst = filepath.Clean(dst)
}
var options []string
@@ -122,9 +145,8 @@ func WithWindowsMounts(osi osinterface.OS, config *runtime.ContainerConfig, extr
options = append(options, "rw")
}
s.Mounts = append(s.Mounts, runtimespec.Mount{
// hcsshim requires clean path, especially '/' -> '\'.
Source: filepath.Clean(src),
Destination: filepath.Clean(dst),
Source: src,
Destination: dst,
Options: options,
})
}

View File

@@ -0,0 +1,34 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package bandwidth provides utilities for bandwidth shaping
package bandwidth

View File

@@ -0,0 +1,72 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bandwidth
import (
"errors"
"k8s.io/apimachinery/pkg/api/resource"
)
// FakeShaper provides an implementation of the bandwith.Shaper.
// Beware this is implementation has no features besides Reset and GetCIDRs.
type FakeShaper struct {
CIDRs []string
ResetCIDRs []string
}
// Limit is not implemented
func (f *FakeShaper) Limit(cidr string, egress, ingress *resource.Quantity) error {
return errors.New("unimplemented")
}
// Reset appends a particular CIDR to the set of ResetCIDRs being managed by this shaper
func (f *FakeShaper) Reset(cidr string) error {
f.ResetCIDRs = append(f.ResetCIDRs, cidr)
return nil
}
// ReconcileInterface is not implemented
func (f *FakeShaper) ReconcileInterface() error {
return errors.New("unimplemented")
}
// ReconcileCIDR is not implemented
func (f *FakeShaper) ReconcileCIDR(cidr string, egress, ingress *resource.Quantity) error {
return errors.New("unimplemented")
}
// GetCIDRs returns the set of CIDRs that are being managed by this shaper
func (f *FakeShaper) GetCIDRs() ([]string, error) {
return f.CIDRs, nil
}

View File

@@ -0,0 +1,56 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bandwidth
import "k8s.io/apimachinery/pkg/api/resource"
// Shaper is designed so that the shaper structs created
// satisfy the Shaper interface.
type Shaper interface {
// Limit the bandwidth for a particular CIDR on a particular interface
// * ingress and egress are in bits/second
// * cidr is expected to be a valid network CIDR (e.g. '1.2.3.4/32' or '10.20.0.1/16')
// 'egress' bandwidth limit applies to all packets on the interface whose source matches 'cidr'
// 'ingress' bandwidth limit applies to all packets on the interface whose destination matches 'cidr'
// Limits are aggregate limits for the CIDR, not per IP address. CIDRs must be unique, but can be overlapping, traffic
// that matches multiple CIDRs counts against all limits.
Limit(cidr string, egress, ingress *resource.Quantity) error
// Remove a bandwidth limit for a particular CIDR on a particular network interface
Reset(cidr string) error
// Reconcile the interface managed by this shaper with the state on the ground.
ReconcileInterface() error
// Reconcile a CIDR managed by this shaper with the state on the ground
ReconcileCIDR(cidr string, egress, ingress *resource.Quantity) error
// GetCIDRs returns the set of CIDRs that are being managed by this shaper
GetCIDRs() ([]string, error)
}

View File

@@ -0,0 +1,361 @@
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bandwidth
import (
"bufio"
"bytes"
"encoding/hex"
"fmt"
"net"
"regexp"
"strings"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/exec"
"k8s.io/klog/v2"
)
var (
classShowMatcher = regexp.MustCompile(`class htb (1:\d+)`)
classAndHandleMatcher = regexp.MustCompile(`filter parent 1:.*fh (\d+::\d+).*flowid (\d+:\d+)`)
)
// tcShaper provides an implementation of the Shaper interface on Linux using the 'tc' tool.
// In general, using this requires that the caller posses the NET_CAP_ADMIN capability, though if you
// do this within an container, it only requires the NS_CAPABLE capability for manipulations to that
// container's network namespace.
// Uses the hierarchical token bucket queuing discipline (htb), this requires Linux 2.4.20 or newer
// or a custom kernel with that queuing discipline backported.
type tcShaper struct {
e exec.Interface
iface string
}
// NewTCShaper makes a new tcShaper for the given interface
func NewTCShaper(iface string) Shaper {
shaper := &tcShaper{
e: exec.New(),
iface: iface,
}
return shaper
}
func (t *tcShaper) execAndLog(cmdStr string, args ...string) error {
klog.V(6).Infof("Running: %s %s", cmdStr, strings.Join(args, " "))
cmd := t.e.Command(cmdStr, args...)
out, err := cmd.CombinedOutput()
klog.V(6).Infof("Output from tc: %s", string(out))
return err
}
func (t *tcShaper) nextClassID() (int, error) {
data, err := t.e.Command("tc", "class", "show", "dev", t.iface).CombinedOutput()
if err != nil {
return -1, err
}
scanner := bufio.NewScanner(bytes.NewBuffer(data))
classes := sets.String{}
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
// skip empty lines
if len(line) == 0 {
continue
}
// expected tc line:
// class htb 1:1 root prio 0 rate 1000Kbit ceil 1000Kbit burst 1600b cburst 1600b
matches := classShowMatcher.FindStringSubmatch(line)
if len(matches) != 2 {
return -1, fmt.Errorf("unexpected output from tc: %s (%v)", scanner.Text(), matches)
}
classes.Insert(matches[1])
}
// Make sure it doesn't go forever
for nextClass := 1; nextClass < 10000; nextClass++ {
if !classes.Has(fmt.Sprintf("1:%d", nextClass)) {
return nextClass, nil
}
}
// This should really never happen
return -1, fmt.Errorf("exhausted class space, please try again")
}
// Convert a CIDR from text to a hex representation
// Strips any masked parts of the IP, so 1.2.3.4/16 becomes hex(1.2.0.0)/ffffffff
func hexCIDR(cidr string) (string, error) {
ip, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
return "", err
}
ip = ip.Mask(ipnet.Mask)
hexIP := hex.EncodeToString([]byte(ip))
hexMask := ipnet.Mask.String()
return hexIP + "/" + hexMask, nil
}
// Convert a CIDR from hex representation to text, opposite of the above.
func asciiCIDR(cidr string) (string, error) {
parts := strings.Split(cidr, "/")
if len(parts) != 2 {
return "", fmt.Errorf("unexpected CIDR format: %s", cidr)
}
ipData, err := hex.DecodeString(parts[0])
if err != nil {
return "", err
}
ip := net.IP(ipData)
maskData, err := hex.DecodeString(parts[1])
if err != nil {
return "", err
}
mask := net.IPMask(maskData)
size, _ := mask.Size()
return fmt.Sprintf("%s/%d", ip.String(), size), nil
}
func (t *tcShaper) findCIDRClass(cidr string) (classAndHandleList [][]string, found bool, err error) {
data, err := t.e.Command("tc", "filter", "show", "dev", t.iface).CombinedOutput()
if err != nil {
return classAndHandleList, false, err
}
hex, err := hexCIDR(cidr)
if err != nil {
return classAndHandleList, false, err
}
spec := fmt.Sprintf("match %s", hex)
scanner := bufio.NewScanner(bytes.NewBuffer(data))
filter := ""
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if len(line) == 0 {
continue
}
if strings.HasPrefix(line, "filter") {
filter = line
continue
}
if strings.Contains(line, spec) {
// expected tc line:
// `filter parent 1: protocol ip pref 1 u32 fh 800::800 order 2048 key ht 800 bkt 0 flowid 1:1` (old version) or
// `filter parent 1: protocol ip pref 1 u32 chain 0 fh 800::800 order 2048 key ht 800 bkt 0 flowid 1:1 not_in_hw` (new version)
matches := classAndHandleMatcher.FindStringSubmatch(filter)
if len(matches) != 3 {
return classAndHandleList, false, fmt.Errorf("unexpected output from tc: %s %d (%v)", filter, len(matches), matches)
}
resultTmp := []string{matches[2], matches[1]}
classAndHandleList = append(classAndHandleList, resultTmp)
}
}
if len(classAndHandleList) > 0 {
return classAndHandleList, true, nil
}
return classAndHandleList, false, nil
}
func makeKBitString(rsrc *resource.Quantity) string {
return fmt.Sprintf("%dkbit", (rsrc.Value() / 1000))
}
func (t *tcShaper) makeNewClass(rate string) (int, error) {
class, err := t.nextClassID()
if err != nil {
return -1, err
}
if err := t.execAndLog("tc", "class", "add",
"dev", t.iface,
"parent", "1:",
"classid", fmt.Sprintf("1:%d", class),
"htb", "rate", rate); err != nil {
return -1, err
}
return class, nil
}
func (t *tcShaper) Limit(cidr string, upload, download *resource.Quantity) (err error) {
var downloadClass, uploadClass int
if download != nil {
if downloadClass, err = t.makeNewClass(makeKBitString(download)); err != nil {
return err
}
if err := t.execAndLog("tc", "filter", "add",
"dev", t.iface,
"protocol", "ip",
"parent", "1:0",
"prio", "1", "u32",
"match", "ip", "dst", cidr,
"flowid", fmt.Sprintf("1:%d", downloadClass)); err != nil {
return err
}
}
if upload != nil {
if uploadClass, err = t.makeNewClass(makeKBitString(upload)); err != nil {
return err
}
if err := t.execAndLog("tc", "filter", "add",
"dev", t.iface,
"protocol", "ip",
"parent", "1:0",
"prio", "1", "u32",
"match", "ip", "src", cidr,
"flowid", fmt.Sprintf("1:%d", uploadClass)); err != nil {
return err
}
}
return nil
}
// tests to see if an interface exists, if it does, return true and the status line for the interface
// returns false, "", <err> if an error occurs.
func (t *tcShaper) interfaceExists() (bool, string, error) {
data, err := t.e.Command("tc", "qdisc", "show", "dev", t.iface).CombinedOutput()
if err != nil {
return false, "", err
}
value := strings.TrimSpace(string(data))
if len(value) == 0 {
return false, "", nil
}
// Newer versions of tc and/or the kernel return the following instead of nothing:
// qdisc noqueue 0: root refcnt 2
fields := strings.Fields(value)
if len(fields) > 1 && fields[1] == "noqueue" {
return false, "", nil
}
return true, value, nil
}
func (t *tcShaper) ReconcileCIDR(cidr string, upload, download *resource.Quantity) error {
_, found, err := t.findCIDRClass(cidr)
if err != nil {
return err
}
if !found {
return t.Limit(cidr, upload, download)
}
// TODO: actually check bandwidth limits here
return nil
}
func (t *tcShaper) ReconcileInterface() error {
exists, output, err := t.interfaceExists()
if err != nil {
return err
}
if !exists {
klog.V(4).Info("Didn't find bandwidth interface, creating")
return t.initializeInterface()
}
fields := strings.Split(output, " ")
if len(fields) < 12 || fields[1] != "htb" || fields[2] != "1:" {
if err := t.deleteInterface(fields[2]); err != nil {
return err
}
return t.initializeInterface()
}
return nil
}
func (t *tcShaper) initializeInterface() error {
return t.execAndLog("tc", "qdisc", "add", "dev", t.iface, "root", "handle", "1:", "htb", "default", "30")
}
func (t *tcShaper) Reset(cidr string) error {
classAndHandle, found, err := t.findCIDRClass(cidr)
if err != nil {
return err
}
if !found {
return fmt.Errorf("Failed to find cidr: %s on interface: %s", cidr, t.iface)
}
for i := 0; i < len(classAndHandle); i++ {
if err := t.execAndLog("tc", "filter", "del",
"dev", t.iface,
"parent", "1:",
"proto", "ip",
"prio", "1",
"handle", classAndHandle[i][1], "u32"); err != nil {
return err
}
if err := t.execAndLog("tc", "class", "del",
"dev", t.iface,
"parent", "1:",
"classid", classAndHandle[i][0]); err != nil {
return err
}
}
return nil
}
func (t *tcShaper) deleteInterface(class string) error {
return t.execAndLog("tc", "qdisc", "delete", "dev", t.iface, "root", "handle", class)
}
func (t *tcShaper) GetCIDRs() ([]string, error) {
data, err := t.e.Command("tc", "filter", "show", "dev", t.iface).CombinedOutput()
if err != nil {
return nil, err
}
result := []string{}
scanner := bufio.NewScanner(bytes.NewBuffer(data))
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if len(line) == 0 {
continue
}
if strings.Contains(line, "match") {
parts := strings.Split(line, " ")
// expected tc line:
// match <cidr> at <number>
if len(parts) != 4 {
return nil, fmt.Errorf("unexpected output: %v", parts)
}
cidr, err := asciiCIDR(parts[1])
if err != nil {
return nil, err
}
result = append(result, cidr)
}
}
return result, nil
}

View File

@@ -0,0 +1,69 @@
// +build !linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bandwidth
import (
"errors"
"k8s.io/apimachinery/pkg/api/resource"
)
type unsupportedShaper struct {
}
// NewTCShaper makes a new unsupportedShapper for the given interface
func NewTCShaper(iface string) Shaper {
return &unsupportedShaper{}
}
func (f *unsupportedShaper) Limit(cidr string, egress, ingress *resource.Quantity) error {
return errors.New("unimplemented")
}
func (f *unsupportedShaper) Reset(cidr string) error {
return nil
}
func (f *unsupportedShaper) ReconcileInterface() error {
return errors.New("unimplemented")
}
func (f *unsupportedShaper) ReconcileCIDR(cidr string, egress, ingress *resource.Quantity) error {
return errors.New("unimplemented")
}
func (f *unsupportedShaper) GetCIDRs() ([]string, error) {
return []string{}, nil
}

View File

@@ -0,0 +1,82 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bandwidth
import (
"fmt"
"k8s.io/apimachinery/pkg/api/resource"
)
var minRsrc = resource.MustParse("1k")
var maxRsrc = resource.MustParse("1P")
func validateBandwidthIsReasonable(rsrc *resource.Quantity) error {
if rsrc.Value() < minRsrc.Value() {
return fmt.Errorf("resource is unreasonably small (< 1kbit)")
}
if rsrc.Value() > maxRsrc.Value() {
return fmt.Errorf("resoruce is unreasonably large (> 1Pbit)")
}
return nil
}
// ExtractPodBandwidthResources extracts the ingress and egress from the given pod annotations
func ExtractPodBandwidthResources(podAnnotations map[string]string) (ingress, egress *resource.Quantity, err error) {
if podAnnotations == nil {
return nil, nil, nil
}
str, found := podAnnotations["kubernetes.io/ingress-bandwidth"]
if found {
ingressValue, err := resource.ParseQuantity(str)
if err != nil {
return nil, nil, err
}
ingress = &ingressValue
if err := validateBandwidthIsReasonable(ingress); err != nil {
return nil, nil, err
}
}
str, found = podAnnotations["kubernetes.io/egress-bandwidth"]
if found {
egressValue, err := resource.ParseQuantity(str)
if err != nil {
return nil, nil, err
}
egress = &egressValue
if err := validateBandwidthIsReasonable(egress); err != nil {
return nil, nil, err
}
}
return ingress, egress, nil
}

View File

@@ -225,7 +225,7 @@ func (c *criService) containerSpec(id string, sandboxID string, sandboxPid uint3
if c.config.DisableCgroup {
specOpts = append(specOpts, customopts.WithDisabledCgroups)
} else {
specOpts = append(specOpts, customopts.WithResources(config.GetLinux().GetResources(), c.config.TolerateMissingHugePagesCgroupController))
specOpts = append(specOpts, customopts.WithResources(config.GetLinux().GetResources(), c.config.TolerateMissingHugetlbController))
if sandboxConfig.GetLinux().GetCgroupParent() != "" {
cgroupsPath := getCgroupsPath(sandboxConfig.GetLinux().GetCgroupParent(), id)
specOpts = append(specOpts, oci.WithCgroup(cgroupsPath))

View File

@@ -73,7 +73,7 @@ func (c *criService) updateContainerResources(ctx context.Context,
return errors.Wrap(err, "failed to get container spec")
}
newSpec, err := updateOCILinuxResource(ctx, oldSpec, resources,
c.config.TolerateMissingHugePagesCgroupController)
c.config.TolerateMissingHugetlbController)
if err != nil {
return errors.Wrap(err, "failed to update resource in spec")
}
@@ -134,7 +134,7 @@ func updateContainerSpec(ctx context.Context, cntr containerd.Container, spec *r
// updateOCILinuxResource updates container resource limit.
func updateOCILinuxResource(ctx context.Context, spec *runtimespec.Spec, new *runtime.LinuxContainerResources,
tolerateMissingHugePagesCgroupController bool) (*runtimespec.Spec, error) {
tolerateMissingHugetlbController bool) (*runtimespec.Spec, error) {
// Copy to make sure old spec is not changed.
var cloned runtimespec.Spec
if err := util.DeepCopy(&cloned, spec); err != nil {
@@ -143,7 +143,7 @@ func updateOCILinuxResource(ctx context.Context, spec *runtimespec.Spec, new *ru
if cloned.Linux == nil {
cloned.Linux = &runtimespec.Linux{}
}
if err := opts.WithResources(new, tolerateMissingHugePagesCgroupController)(ctx, nil, nil, &cloned); err != nil {
if err := opts.WithResources(new, tolerateMissingHugetlbController)(ctx, nil, nil, &cloned); err != nil {
return nil, errors.Wrap(err, "unable to set linux container resources")
}
return &cloned, nil

View File

@@ -284,7 +284,7 @@ func (c *criService) getTLSConfig(registryTLSConfig criconfig.TLSConfig) (*tls.C
if len(cert.Certificate) != 0 {
tlsConfig.Certificates = []tls.Certificate{cert}
}
tlsConfig.BuildNameToCertificate()
tlsConfig.BuildNameToCertificate() // nolint:staticcheck
}
if registryTLSConfig.CAFile != "" {

View File

@@ -33,13 +33,13 @@ import (
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/util/bandwidth"
"github.com/containerd/cri/pkg/annotations"
criconfig "github.com/containerd/cri/pkg/config"
customopts "github.com/containerd/cri/pkg/containerd/opts"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
"github.com/containerd/cri/pkg/netns"
"github.com/containerd/cri/pkg/server/bandwidth"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
"github.com/containerd/cri/pkg/util"
selinux "github.com/opencontainers/selinux/go-selinux"

View File

@@ -28,12 +28,12 @@ import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/oci"
"github.com/containerd/containerd/plugin"
"github.com/containerd/cri/pkg/streaming"
cni "github.com/containerd/go-cni"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"github.com/containerd/cri/pkg/store/label"

View File

@@ -30,10 +30,10 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/remotecommand"
k8scert "k8s.io/client-go/util/cert"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"k8s.io/utils/exec"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
"github.com/containerd/cri/pkg/streaming"
)
type streamListenerMode int

View File

@@ -17,8 +17,11 @@
package sandbox
import (
"strconv"
"sync"
"time"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
)
// The sandbox state machine in the CRI plugin:
@@ -63,7 +66,7 @@ type State uint32
const (
// StateReady is ready state, it means sandbox container
// is running.
StateReady = iota
StateReady State = iota
// StateNotReady is notready state, it ONLY means sandbox
// container is not running.
// StopPodSandbox should still be called for NOTREADY sandbox to
@@ -75,6 +78,21 @@ const (
StateUnknown
)
// String returns the string representation of the state
func (s State) String() string {
switch s {
case StateReady:
return runtime.PodSandboxState_SANDBOX_READY.String()
case StateNotReady:
return runtime.PodSandboxState_SANDBOX_NOTREADY.String()
case StateUnknown:
// PodSandboxState doesn't have an unknown state, but State does, so return a string using the same convention
return "SANDBOX_UNKNOWN"
default:
return "invalid sandbox state value: " + strconv.Itoa(int(s))
}
}
// Status is the status of a sandbox.
type Status struct {
// Pid is the init process id of the sandbox container.

View File

@@ -0,0 +1,72 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package streaming
import (
"net/http"
"strconv"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
)
// NewErrorStreamingDisabled creates an error for disabled streaming method.
func NewErrorStreamingDisabled(method string) error {
return grpcstatus.Errorf(codes.NotFound, "streaming method %s disabled", method)
}
// NewErrorTooManyInFlight creates an error for exceeding the maximum number of in-flight requests.
func NewErrorTooManyInFlight() error {
return grpcstatus.Error(codes.ResourceExhausted, "maximum number of in-flight requests exceeded")
}
// WriteError translates a CRI streaming error into an appropriate HTTP response.
func WriteError(err error, w http.ResponseWriter) error {
s, _ := grpcstatus.FromError(err)
var status int
switch s.Code() {
case codes.NotFound:
status = http.StatusNotFound
case codes.ResourceExhausted:
// We only expect to hit this if there is a DoS, so we just wait the full TTL.
// If this is ever hit in steady-state operations, consider increasing the maxInFlight requests,
// or plumbing through the time to next expiration.
w.Header().Set("Retry-After", strconv.Itoa(int(cacheTTL.Seconds())))
status = http.StatusTooManyRequests
default:
status = http.StatusInternalServerError
}
w.WriteHeader(status)
_, writeErr := w.Write([]byte(err.Error()))
return writeErr
}

View File

@@ -0,0 +1,40 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package portforward contains server-side logic for handling port forwarding requests.
package portforward
// ProtocolV1Name is the name of the subprotocol used for port forwarding.
const ProtocolV1Name = "portforward.k8s.io"
// SupportedProtocols are the supported port forwarding protocols.
var SupportedProtocols = []string{ProtocolV1Name}

View File

@@ -0,0 +1,315 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package portforward
import (
"errors"
"fmt"
"net/http"
"strconv"
"sync"
"time"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
)
func handleHTTPStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error {
_, err := httpstream.Handshake(req, w, supportedPortForwardProtocols)
// negotiated protocol isn't currently used server side, but could be in the future
if err != nil {
// Handshake writes the error to the client
return err
}
streamChan := make(chan httpstream.Stream, 1)
klog.V(5).Infof("Upgrading port forward response")
upgrader := spdy.NewResponseUpgrader()
conn := upgrader.UpgradeResponse(w, req, httpStreamReceived(streamChan))
if conn == nil {
return errors.New("unable to upgrade httpstream connection")
}
defer conn.Close()
klog.V(5).Infof("(conn=%p) setting port forwarding streaming connection idle timeout to %v", conn, idleTimeout)
conn.SetIdleTimeout(idleTimeout)
h := &httpStreamHandler{
conn: conn,
streamChan: streamChan,
streamPairs: make(map[string]*httpStreamPair),
streamCreationTimeout: streamCreationTimeout,
pod: podName,
uid: uid,
forwarder: portForwarder,
}
h.run()
return nil
}
// httpStreamReceived is the httpstream.NewStreamHandler for port
// forward streams. It checks each stream's port and stream type headers,
// rejecting any streams that with missing or invalid values. Each valid
// stream is sent to the streams channel.
func httpStreamReceived(streams chan httpstream.Stream) func(httpstream.Stream, <-chan struct{}) error {
return func(stream httpstream.Stream, replySent <-chan struct{}) error {
// make sure it has a valid port header
portString := stream.Headers().Get(api.PortHeader)
if len(portString) == 0 {
return fmt.Errorf("%q header is required", api.PortHeader)
}
port, err := strconv.ParseUint(portString, 10, 16)
if err != nil {
return fmt.Errorf("unable to parse %q as a port: %v", portString, err)
}
if port < 1 {
return fmt.Errorf("port %q must be > 0", portString)
}
// make sure it has a valid stream type header
streamType := stream.Headers().Get(api.StreamType)
if len(streamType) == 0 {
return fmt.Errorf("%q header is required", api.StreamType)
}
if streamType != api.StreamTypeError && streamType != api.StreamTypeData {
return fmt.Errorf("invalid stream type %q", streamType)
}
streams <- stream
return nil
}
}
// httpStreamHandler is capable of processing multiple port forward
// requests over a single httpstream.Connection.
type httpStreamHandler struct {
conn httpstream.Connection
streamChan chan httpstream.Stream
streamPairsLock sync.RWMutex
streamPairs map[string]*httpStreamPair
streamCreationTimeout time.Duration
pod string
uid types.UID
forwarder PortForwarder
}
// getStreamPair returns a httpStreamPair for requestID. This creates a
// new pair if one does not yet exist for the requestID. The returned bool is
// true if the pair was created.
func (h *httpStreamHandler) getStreamPair(requestID string) (*httpStreamPair, bool) {
h.streamPairsLock.Lock()
defer h.streamPairsLock.Unlock()
if p, ok := h.streamPairs[requestID]; ok {
klog.V(5).Infof("(conn=%p, request=%s) found existing stream pair", h.conn, requestID)
return p, false
}
klog.V(5).Infof("(conn=%p, request=%s) creating new stream pair", h.conn, requestID)
p := newPortForwardPair(requestID)
h.streamPairs[requestID] = p
return p, true
}
// monitorStreamPair waits for the pair to receive both its error and data
// streams, or for the timeout to expire (whichever happens first), and then
// removes the pair.
func (h *httpStreamHandler) monitorStreamPair(p *httpStreamPair, timeout <-chan time.Time) {
select {
case <-timeout:
err := fmt.Errorf("(conn=%v, request=%s) timed out waiting for streams", h.conn, p.requestID)
utilruntime.HandleError(err)
p.printError(err.Error())
case <-p.complete:
klog.V(5).Infof("(conn=%v, request=%s) successfully received error and data streams", h.conn, p.requestID)
}
h.removeStreamPair(p.requestID)
}
// removeStreamPair removes the stream pair identified by requestID from streamPairs.
func (h *httpStreamHandler) removeStreamPair(requestID string) {
h.streamPairsLock.Lock()
defer h.streamPairsLock.Unlock()
delete(h.streamPairs, requestID)
}
// requestID returns the request id for stream.
func (h *httpStreamHandler) requestID(stream httpstream.Stream) string {
requestID := stream.Headers().Get(api.PortForwardRequestIDHeader)
if len(requestID) == 0 {
klog.V(5).Infof("(conn=%p) stream received without %s header", h.conn, api.PortForwardRequestIDHeader)
// If we get here, it's because the connection came from an older client
// that isn't generating the request id header
// (https://github.com/kubernetes/kubernetes/blob/843134885e7e0b360eb5441e85b1410a8b1a7a0c/pkg/client/unversioned/portforward/portforward.go#L258-L287)
//
// This is a best-effort attempt at supporting older clients.
//
// When there aren't concurrent new forwarded connections, each connection
// will have a pair of streams (data, error), and the stream IDs will be
// consecutive odd numbers, e.g. 1 and 3 for the first connection. Convert
// the stream ID into a pseudo-request id by taking the stream type and
// using id = stream.Identifier() when the stream type is error,
// and id = stream.Identifier() - 2 when it's data.
//
// NOTE: this only works when there are not concurrent new streams from
// multiple forwarded connections; it's a best-effort attempt at supporting
// old clients that don't generate request ids. If there are concurrent
// new connections, it's possible that 1 connection gets streams whose IDs
// are not consecutive (e.g. 5 and 9 instead of 5 and 7).
streamType := stream.Headers().Get(api.StreamType)
switch streamType {
case api.StreamTypeError:
requestID = strconv.Itoa(int(stream.Identifier()))
case api.StreamTypeData:
requestID = strconv.Itoa(int(stream.Identifier()) - 2)
}
klog.V(5).Infof("(conn=%p) automatically assigning request ID=%q from stream type=%s, stream ID=%d", h.conn, requestID, streamType, stream.Identifier())
}
return requestID
}
// run is the main loop for the httpStreamHandler. It processes new
// streams, invoking portForward for each complete stream pair. The loop exits
// when the httpstream.Connection is closed.
func (h *httpStreamHandler) run() {
klog.V(5).Infof("(conn=%p) waiting for port forward streams", h.conn)
Loop:
for {
select {
case <-h.conn.CloseChan():
klog.V(5).Infof("(conn=%p) upgraded connection closed", h.conn)
break Loop
case stream := <-h.streamChan:
requestID := h.requestID(stream)
streamType := stream.Headers().Get(api.StreamType)
klog.V(5).Infof("(conn=%p, request=%s) received new stream of type %s", h.conn, requestID, streamType)
p, created := h.getStreamPair(requestID)
if created {
go h.monitorStreamPair(p, time.After(h.streamCreationTimeout))
}
if complete, err := p.add(stream); err != nil {
msg := fmt.Sprintf("error processing stream for request %s: %v", requestID, err)
utilruntime.HandleError(errors.New(msg))
p.printError(msg)
} else if complete {
go h.portForward(p)
}
}
}
}
// portForward invokes the httpStreamHandler's forwarder.PortForward
// function for the given stream pair.
func (h *httpStreamHandler) portForward(p *httpStreamPair) {
defer p.dataStream.Close()
defer p.errorStream.Close()
portString := p.dataStream.Headers().Get(api.PortHeader)
port, _ := strconv.ParseInt(portString, 10, 32)
klog.V(5).Infof("(conn=%p, request=%s) invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
err := h.forwarder.PortForward(h.pod, h.uid, int32(port), p.dataStream)
klog.V(5).Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
if err != nil {
msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", port, h.pod, h.uid, err)
utilruntime.HandleError(msg)
fmt.Fprint(p.errorStream, msg.Error())
}
}
// httpStreamPair represents the error and data streams for a port
// forwarding request.
type httpStreamPair struct {
lock sync.RWMutex
requestID string
dataStream httpstream.Stream
errorStream httpstream.Stream
complete chan struct{}
}
// newPortForwardPair creates a new httpStreamPair.
func newPortForwardPair(requestID string) *httpStreamPair {
return &httpStreamPair{
requestID: requestID,
complete: make(chan struct{}),
}
}
// add adds the stream to the httpStreamPair. If the pair already
// contains a stream for the new stream's type, an error is returned. add
// returns true if both the data and error streams for this pair have been
// received.
func (p *httpStreamPair) add(stream httpstream.Stream) (bool, error) {
p.lock.Lock()
defer p.lock.Unlock()
switch stream.Headers().Get(api.StreamType) {
case api.StreamTypeError:
if p.errorStream != nil {
return false, errors.New("error stream already assigned")
}
p.errorStream = stream
case api.StreamTypeData:
if p.dataStream != nil {
return false, errors.New("data stream already assigned")
}
p.dataStream = stream
}
complete := p.errorStream != nil && p.dataStream != nil
if complete {
close(p.complete)
}
return complete, nil
}
// printError writes s to p.errorStream if p.errorStream has been set.
func (p *httpStreamPair) printError(s string) {
p.lock.RLock()
defer p.lock.RUnlock()
if p.errorStream != nil {
fmt.Fprint(p.errorStream, s)
}
}

View File

@@ -0,0 +1,69 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package portforward
import (
"io"
"net/http"
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/util/wsstream"
)
// PortForwarder knows how to forward content from a data stream to/from a port
// in a pod.
type PortForwarder interface {
// PortForwarder copies data between a data stream and a port in a pod.
PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error
}
// ServePortForward handles a port forwarding request. A single request is
// kept alive as long as the client is still alive and the connection has not
// been timed out due to idleness. This function handles multiple forwarded
// connections; i.e., multiple `curl http://localhost:8888/` requests will be
// handled by a single invocation of ServePortForward.
func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, portForwardOptions *V4Options, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) {
var err error
if wsstream.IsWebSocketRequest(req) {
err = handleWebSocketStreams(req, w, portForwarder, podName, uid, portForwardOptions, supportedProtocols, idleTimeout, streamCreationTimeout)
} else {
err = handleHTTPStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout)
}
if err != nil {
runtime.HandleError(err)
return
}
}

View File

@@ -0,0 +1,213 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package portforward
import (
"encoding/binary"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"sync"
"time"
"k8s.io/klog/v2"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/server/httplog"
"k8s.io/apiserver/pkg/util/wsstream"
)
const (
dataChannel = iota
errorChannel
v4BinaryWebsocketProtocol = "v4." + wsstream.ChannelWebSocketProtocol
v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol
)
// V4Options contains details about which streams are required for port
// forwarding.
// All fields included in V4Options need to be expressed explicitly in the
// CRI (k8s.io/cri-api/pkg/apis/{version}/api.proto) PortForwardRequest.
type V4Options struct {
Ports []int32
}
// NewV4Options creates a new options from the Request.
func NewV4Options(req *http.Request) (*V4Options, error) {
if !wsstream.IsWebSocketRequest(req) {
return &V4Options{}, nil
}
portStrings := req.URL.Query()[api.PortHeader]
if len(portStrings) == 0 {
return nil, fmt.Errorf("query parameter %q is required", api.PortHeader)
}
ports := make([]int32, 0, len(portStrings))
for _, portString := range portStrings {
if len(portString) == 0 {
return nil, fmt.Errorf("query parameter %q cannot be empty", api.PortHeader)
}
for _, p := range strings.Split(portString, ",") {
port, err := strconv.ParseUint(p, 10, 16)
if err != nil {
return nil, fmt.Errorf("unable to parse %q as a port: %v", portString, err)
}
if port < 1 {
return nil, fmt.Errorf("port %q must be > 0", portString)
}
ports = append(ports, int32(port))
}
}
return &V4Options{
Ports: ports,
}, nil
}
// BuildV4Options returns a V4Options based on the given information.
func BuildV4Options(ports []int32) (*V4Options, error) {
return &V4Options{Ports: ports}, nil
}
// handleWebSocketStreams handles requests to forward ports to a pod via
// a PortForwarder. A pair of streams are created per port (DATA n,
// ERROR n+1). The associated port is written to each stream as a unsigned 16
// bit integer in little endian format.
func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, opts *V4Options, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error {
channels := make([]wsstream.ChannelType, 0, len(opts.Ports)*2)
for i := 0; i < len(opts.Ports); i++ {
channels = append(channels, wsstream.ReadWriteChannel, wsstream.WriteChannel)
}
conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{
"": {
Binary: true,
Channels: channels,
},
v4BinaryWebsocketProtocol: {
Binary: true,
Channels: channels,
},
v4Base64WebsocketProtocol: {
Binary: false,
Channels: channels,
},
})
conn.SetIdleTimeout(idleTimeout)
_, streams, err := conn.Open(httplog.Unlogged(req, w), req)
if err != nil {
err = fmt.Errorf("unable to upgrade websocket connection: %v", err)
return err
}
defer conn.Close()
streamPairs := make([]*websocketStreamPair, len(opts.Ports))
for i := range streamPairs {
streamPair := websocketStreamPair{
port: opts.Ports[i],
dataStream: streams[i*2+dataChannel],
errorStream: streams[i*2+errorChannel],
}
streamPairs[i] = &streamPair
portBytes := make([]byte, 2)
// port is always positive so conversion is allowable
binary.LittleEndian.PutUint16(portBytes, uint16(streamPair.port))
streamPair.dataStream.Write(portBytes)
streamPair.errorStream.Write(portBytes)
}
h := &websocketStreamHandler{
conn: conn,
streamPairs: streamPairs,
pod: podName,
uid: uid,
forwarder: portForwarder,
}
h.run()
return nil
}
// websocketStreamPair represents the error and data streams for a port
// forwarding request.
type websocketStreamPair struct {
port int32
dataStream io.ReadWriteCloser
errorStream io.WriteCloser
}
// websocketStreamHandler is capable of processing a single port forward
// request over a websocket connection
type websocketStreamHandler struct {
conn *wsstream.Conn
streamPairs []*websocketStreamPair
pod string
uid types.UID
forwarder PortForwarder
}
// run invokes the websocketStreamHandler's forwarder.PortForward
// function for the given stream pair.
func (h *websocketStreamHandler) run() {
wg := sync.WaitGroup{}
wg.Add(len(h.streamPairs))
for _, pair := range h.streamPairs {
p := pair
go func() {
defer wg.Done()
h.portForward(p)
}()
}
wg.Wait()
}
func (h *websocketStreamHandler) portForward(p *websocketStreamPair) {
defer p.dataStream.Close()
defer p.errorStream.Close()
klog.V(5).Infof("(conn=%p) invoking forwarder.PortForward for port %d", h.conn, p.port)
err := h.forwarder.PortForward(h.pod, h.uid, p.port, p.dataStream)
klog.V(5).Infof("(conn=%p) done invoking forwarder.PortForward for port %d", h.conn, p.port)
if err != nil {
msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", p.port, h.pod, h.uid, err)
runtime.HandleError(msg)
fmt.Fprint(p.errorStream, msg.Error())
}
}

View File

@@ -0,0 +1,75 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
import (
"fmt"
"io"
"net/http"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/remotecommand"
)
// Attacher knows how to attach to a running container in a pod.
type Attacher interface {
// AttachContainer attaches to the running container in the pod, copying data between in/out/err
// and the container's stdin/stdout/stderr.
AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
}
// ServeAttach handles requests to attach to a container. After creating/receiving the required
// streams, it delegates the actual attaching to attacher.
func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
if !ok {
// error is handled by createStreams
return
}
defer ctx.conn.Close()
err := attacher.AttachContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan)
if err != nil {
err = fmt.Errorf("error attaching to container: %v", err)
runtime.HandleError(err)
ctx.writeStatus(apierrors.NewInternalError(err))
} else {
ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusSuccess,
}})
}
}

View File

@@ -0,0 +1,34 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package remotecommand contains functions related to executing commands in and attaching to pods.
package remotecommand

View File

@@ -0,0 +1,95 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
import (
"fmt"
"io"
"net/http"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/remotecommand"
utilexec "k8s.io/utils/exec"
)
// Executor knows how to execute a command in a container in a pod.
type Executor interface {
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error
}
// ServeExec handles requests to execute a command in a container. After
// creating/receiving the required streams, it delegates the actual execution
// to the executor.
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
if !ok {
// error is handled by createStreams
return
}
defer ctx.conn.Close()
err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0)
if err != nil {
if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() {
rc := exitErr.ExitStatus()
ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusFailure,
Reason: remotecommandconsts.NonZeroExitCodeReason,
Details: &metav1.StatusDetails{
Causes: []metav1.StatusCause{
{
Type: remotecommandconsts.ExitCodeCauseType,
Message: fmt.Sprintf("%d", rc),
},
},
},
Message: fmt.Sprintf("command terminated with non-zero exit code: %v", exitErr),
}})
} else {
err = fmt.Errorf("error executing command in container: %v", err)
runtime.HandleError(err)
ctx.writeStatus(apierrors.NewInternalError(err))
}
} else {
ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusSuccess,
}})
}
}

View File

@@ -0,0 +1,463 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"time"
api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/util/wsstream"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/klog/v2"
)
// Options contains details about which streams are required for
// remote command execution.
type Options struct {
Stdin bool
Stdout bool
Stderr bool
TTY bool
}
// NewOptions creates a new Options from the Request.
func NewOptions(req *http.Request) (*Options, error) {
tty := req.FormValue(api.ExecTTYParam) == "1"
stdin := req.FormValue(api.ExecStdinParam) == "1"
stdout := req.FormValue(api.ExecStdoutParam) == "1"
stderr := req.FormValue(api.ExecStderrParam) == "1"
if tty && stderr {
// TODO: make this an error before we reach this method
klog.V(4).Infof("Access to exec with tty and stderr is not supported, bypassing stderr")
stderr = false
}
if !stdin && !stdout && !stderr {
return nil, fmt.Errorf("you must specify at least 1 of stdin, stdout, stderr")
}
return &Options{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
TTY: tty,
}, nil
}
// context contains the connection and streams used when
// forwarding an attach or execute session into a container.
type context struct {
conn io.Closer
stdinStream io.ReadCloser
stdoutStream io.WriteCloser
stderrStream io.WriteCloser
writeStatus func(status *apierrors.StatusError) error
resizeStream io.ReadCloser
resizeChan chan remotecommand.TerminalSize
tty bool
}
// streamAndReply holds both a Stream and a channel that is closed when the stream's reply frame is
// enqueued. Consumers can wait for replySent to be closed prior to proceeding, to ensure that the
// replyFrame is enqueued before the connection's goaway frame is sent (e.g. if a stream was
// received and right after, the connection gets closed).
type streamAndReply struct {
httpstream.Stream
replySent <-chan struct{}
}
// waitStreamReply waits until either replySent or stop is closed. If replySent is closed, it sends
// an empty struct to the notify channel.
func waitStreamReply(replySent <-chan struct{}, notify chan<- struct{}, stop <-chan struct{}) {
select {
case <-replySent:
notify <- struct{}{}
case <-stop:
}
}
func createStreams(req *http.Request, w http.ResponseWriter, opts *Options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
var ctx *context
var ok bool
if wsstream.IsWebSocketRequest(req) {
ctx, ok = createWebSocketStreams(req, w, opts, idleTimeout)
} else {
ctx, ok = createHTTPStreamStreams(req, w, opts, supportedStreamProtocols, idleTimeout, streamCreationTimeout)
}
if !ok {
return nil, false
}
if ctx.resizeStream != nil {
ctx.resizeChan = make(chan remotecommand.TerminalSize)
go handleResizeEvents(ctx.resizeStream, ctx.resizeChan)
}
return ctx, true
}
func createHTTPStreamStreams(req *http.Request, w http.ResponseWriter, opts *Options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return nil, false
}
streamCh := make(chan streamAndReply)
upgrader := spdy.NewResponseUpgrader()
conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream, replySent <-chan struct{}) error {
streamCh <- streamAndReply{Stream: stream, replySent: replySent}
return nil
})
// from this point on, we can no longer call methods on response
if conn == nil {
// The upgrader is responsible for notifying the client of any errors that
// occurred during upgrading. All we can do is return here at this point
// if we weren't successful in upgrading.
return nil, false
}
conn.SetIdleTimeout(idleTimeout)
var handler protocolHandler
switch protocol {
case remotecommandconsts.StreamProtocolV4Name:
handler = &v4ProtocolHandler{}
case remotecommandconsts.StreamProtocolV3Name:
handler = &v3ProtocolHandler{}
case remotecommandconsts.StreamProtocolV2Name:
handler = &v2ProtocolHandler{}
case "":
klog.V(4).Infof("Client did not request protocol negotiation. Falling back to %q", remotecommandconsts.StreamProtocolV1Name)
fallthrough
case remotecommandconsts.StreamProtocolV1Name:
handler = &v1ProtocolHandler{}
}
// count the streams client asked for, starting with 1
expectedStreams := 1
if opts.Stdin {
expectedStreams++
}
if opts.Stdout {
expectedStreams++
}
if opts.Stderr {
expectedStreams++
}
if opts.TTY && handler.supportsTerminalResizing() {
expectedStreams++
}
expired := time.NewTimer(streamCreationTimeout)
defer expired.Stop()
ctx, err := handler.waitForStreams(streamCh, expectedStreams, expired.C)
if err != nil {
runtime.HandleError(err)
return nil, false
}
ctx.conn = conn
ctx.tty = opts.TTY
return ctx, true
}
type protocolHandler interface {
// waitForStreams waits for the expected streams or a timeout, returning a
// remoteCommandContext if all the streams were received, or an error if not.
waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error)
// supportsTerminalResizing returns true if the protocol handler supports terminal resizing
supportsTerminalResizing() bool
}
// v4ProtocolHandler implements the V4 protocol version for streaming command execution. It only differs
// in from v3 in the error stream format using an json-marshaled metav1.Status which carries
// the process' exit code.
type v4ProtocolHandler struct{}
func (*v4ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
ctx := &context{}
receivedStreams := 0
replyChan := make(chan struct{})
stop := make(chan struct{})
defer close(stop)
WaitForStreams:
for {
select {
case stream := <-streams:
streamType := stream.Headers().Get(api.StreamType)
switch streamType {
case api.StreamTypeError:
ctx.writeStatus = v4WriteStatusFunc(stream) // write json errors
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdin:
ctx.stdinStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdout:
ctx.stdoutStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStderr:
ctx.stderrStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeResize:
ctx.resizeStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
default:
runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType))
}
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break WaitForStreams
}
case <-expired:
// TODO find a way to return the error to the user. Maybe use a separate
// stream to report errors?
return nil, errors.New("timed out waiting for client to create streams")
}
}
return ctx, nil
}
// supportsTerminalResizing returns true because v4ProtocolHandler supports it
func (*v4ProtocolHandler) supportsTerminalResizing() bool { return true }
// v3ProtocolHandler implements the V3 protocol version for streaming command execution.
type v3ProtocolHandler struct{}
func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
ctx := &context{}
receivedStreams := 0
replyChan := make(chan struct{})
stop := make(chan struct{})
defer close(stop)
WaitForStreams:
for {
select {
case stream := <-streams:
streamType := stream.Headers().Get(api.StreamType)
switch streamType {
case api.StreamTypeError:
ctx.writeStatus = v1WriteStatusFunc(stream)
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdin:
ctx.stdinStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdout:
ctx.stdoutStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStderr:
ctx.stderrStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeResize:
ctx.resizeStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
default:
runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType))
}
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break WaitForStreams
}
case <-expired:
// TODO find a way to return the error to the user. Maybe use a separate
// stream to report errors?
return nil, errors.New("timed out waiting for client to create streams")
}
}
return ctx, nil
}
// supportsTerminalResizing returns true because v3ProtocolHandler supports it
func (*v3ProtocolHandler) supportsTerminalResizing() bool { return true }
// v2ProtocolHandler implements the V2 protocol version for streaming command execution.
type v2ProtocolHandler struct{}
func (*v2ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
ctx := &context{}
receivedStreams := 0
replyChan := make(chan struct{})
stop := make(chan struct{})
defer close(stop)
WaitForStreams:
for {
select {
case stream := <-streams:
streamType := stream.Headers().Get(api.StreamType)
switch streamType {
case api.StreamTypeError:
ctx.writeStatus = v1WriteStatusFunc(stream)
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdin:
ctx.stdinStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdout:
ctx.stdoutStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStderr:
ctx.stderrStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
default:
runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType))
}
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break WaitForStreams
}
case <-expired:
// TODO find a way to return the error to the user. Maybe use a separate
// stream to report errors?
return nil, errors.New("timed out waiting for client to create streams")
}
}
return ctx, nil
}
// supportsTerminalResizing returns false because v2ProtocolHandler doesn't support it.
func (*v2ProtocolHandler) supportsTerminalResizing() bool { return false }
// v1ProtocolHandler implements the V1 protocol version for streaming command execution.
type v1ProtocolHandler struct{}
func (*v1ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
ctx := &context{}
receivedStreams := 0
replyChan := make(chan struct{})
stop := make(chan struct{})
defer close(stop)
WaitForStreams:
for {
select {
case stream := <-streams:
streamType := stream.Headers().Get(api.StreamType)
switch streamType {
case api.StreamTypeError:
ctx.writeStatus = v1WriteStatusFunc(stream)
// This defer statement shouldn't be here, but due to previous refactoring, it ended up in
// here. This is what 1.0.x kubelets do, so we're retaining that behavior. This is fixed in
// the v2ProtocolHandler.
defer stream.Reset()
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdin:
ctx.stdinStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdout:
ctx.stdoutStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStderr:
ctx.stderrStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
default:
runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType))
}
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break WaitForStreams
}
case <-expired:
// TODO find a way to return the error to the user. Maybe use a separate
// stream to report errors?
return nil, errors.New("timed out waiting for client to create streams")
}
}
if ctx.stdinStream != nil {
ctx.stdinStream.Close()
}
return ctx, nil
}
// supportsTerminalResizing returns false because v1ProtocolHandler doesn't support it.
func (*v1ProtocolHandler) supportsTerminalResizing() bool { return false }
func handleResizeEvents(stream io.Reader, channel chan<- remotecommand.TerminalSize) {
defer runtime.HandleCrash()
defer close(channel)
decoder := json.NewDecoder(stream)
for {
size := remotecommand.TerminalSize{}
if err := decoder.Decode(&size); err != nil {
break
}
channel <- size
}
}
func v1WriteStatusFunc(stream io.Writer) func(status *apierrors.StatusError) error {
return func(status *apierrors.StatusError) error {
if status.Status().Status == metav1.StatusSuccess {
return nil // send error messages
}
_, err := stream.Write([]byte(status.Error()))
return err
}
}
// v4WriteStatusFunc returns a WriteStatusFunc that marshals a given api Status
// as json in the error channel.
func v4WriteStatusFunc(stream io.Writer) func(status *apierrors.StatusError) error {
return func(status *apierrors.StatusError) error {
bs, err := json.Marshal(status.Status())
if err != nil {
return err
}
_, err = stream.Write(bs)
return err
}
}

View File

@@ -0,0 +1,148 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
import (
"fmt"
"net/http"
"time"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/server/httplog"
"k8s.io/apiserver/pkg/util/wsstream"
)
const (
stdinChannel = iota
stdoutChannel
stderrChannel
errorChannel
resizeChannel
preV4BinaryWebsocketProtocol = wsstream.ChannelWebSocketProtocol
preV4Base64WebsocketProtocol = wsstream.Base64ChannelWebSocketProtocol
v4BinaryWebsocketProtocol = "v4." + wsstream.ChannelWebSocketProtocol
v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol
)
// createChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2)
// along with the approximate duplex value. It also creates the error (3) and resize (4) channels.
func createChannels(opts *Options) []wsstream.ChannelType {
// open the requested channels, and always open the error channel
channels := make([]wsstream.ChannelType, 5)
channels[stdinChannel] = readChannel(opts.Stdin)
channels[stdoutChannel] = writeChannel(opts.Stdout)
channels[stderrChannel] = writeChannel(opts.Stderr)
channels[errorChannel] = wsstream.WriteChannel
channels[resizeChannel] = wsstream.ReadChannel
return channels
}
// readChannel returns wsstream.ReadChannel if real is true, or wsstream.IgnoreChannel.
func readChannel(real bool) wsstream.ChannelType {
if real {
return wsstream.ReadChannel
}
return wsstream.IgnoreChannel
}
// writeChannel returns wsstream.WriteChannel if real is true, or wsstream.IgnoreChannel.
func writeChannel(real bool) wsstream.ChannelType {
if real {
return wsstream.WriteChannel
}
return wsstream.IgnoreChannel
}
// createWebSocketStreams returns a context containing the websocket connection and
// streams needed to perform an exec or an attach.
func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *Options, idleTimeout time.Duration) (*context, bool) {
channels := createChannels(opts)
conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{
"": {
Binary: true,
Channels: channels,
},
preV4BinaryWebsocketProtocol: {
Binary: true,
Channels: channels,
},
preV4Base64WebsocketProtocol: {
Binary: false,
Channels: channels,
},
v4BinaryWebsocketProtocol: {
Binary: true,
Channels: channels,
},
v4Base64WebsocketProtocol: {
Binary: false,
Channels: channels,
},
})
conn.SetIdleTimeout(idleTimeout)
negotiatedProtocol, streams, err := conn.Open(httplog.Unlogged(req, w), req)
if err != nil {
runtime.HandleError(fmt.Errorf("unable to upgrade websocket connection: %v", err))
return nil, false
}
// Send an empty message to the lowest writable channel to notify the client the connection is established
// TODO: make generic to SPDY and WebSockets and do it outside of this method?
switch {
case opts.Stdout:
streams[stdoutChannel].Write([]byte{})
case opts.Stderr:
streams[stderrChannel].Write([]byte{})
default:
streams[errorChannel].Write([]byte{})
}
ctx := &context{
conn: conn,
stdinStream: streams[stdinChannel],
stdoutStream: streams[stdoutChannel],
stderrStream: streams[stderrChannel],
tty: opts.TTY,
resizeStream: streams[resizeChannel],
}
switch negotiatedProtocol {
case v4BinaryWebsocketProtocol, v4Base64WebsocketProtocol:
ctx.writeStatus = v4WriteStatusFunc(streams[errorChannel])
default:
ctx.writeStatus = v1WriteStatusFunc(streams[errorChannel])
}
return ctx, true
}

View File

@@ -0,0 +1,162 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package streaming
import (
"container/list"
"crypto/rand"
"encoding/base64"
"fmt"
"math"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
var (
// cacheTTL is the timeout after which tokens become invalid.
cacheTTL = 1 * time.Minute
// maxInFlight is the maximum number of in-flight requests to allow.
maxInFlight = 1000
// tokenLen is the length of the random base64 encoded token identifying the request.
tokenLen = 8
)
// requestCache caches streaming (exec/attach/port-forward) requests and generates a single-use
// random token for their retrieval. The requestCache is used for building streaming URLs without
// the need to encode every request parameter in the URL.
type requestCache struct {
// clock is used to obtain the current time
clock clock.Clock
// tokens maps the generate token to the request for fast retrieval.
tokens map[string]*list.Element
// ll maintains an age-ordered request list for faster garbage collection of expired requests.
ll *list.List
lock sync.Mutex
}
// Type representing an *ExecRequest, *AttachRequest, or *PortForwardRequest.
type request interface{}
type cacheEntry struct {
token string
req request
expireTime time.Time
}
func newRequestCache() *requestCache {
return &requestCache{
clock: clock.RealClock{},
ll: list.New(),
tokens: make(map[string]*list.Element),
}
}
// Insert the given request into the cache and returns the token used for fetching it out.
func (c *requestCache) Insert(req request) (token string, err error) {
c.lock.Lock()
defer c.lock.Unlock()
// Remove expired entries.
c.gc()
// If the cache is full, reject the request.
if c.ll.Len() == maxInFlight {
return "", NewErrorTooManyInFlight()
}
token, err = c.uniqueToken()
if err != nil {
return "", err
}
ele := c.ll.PushFront(&cacheEntry{token, req, c.clock.Now().Add(cacheTTL)})
c.tokens[token] = ele
return token, nil
}
// Consume the token (remove it from the cache) and return the cached request, if found.
func (c *requestCache) Consume(token string) (req request, found bool) {
c.lock.Lock()
defer c.lock.Unlock()
ele, ok := c.tokens[token]
if !ok {
return nil, false
}
c.ll.Remove(ele)
delete(c.tokens, token)
entry := ele.Value.(*cacheEntry)
if c.clock.Now().After(entry.expireTime) {
// Entry already expired.
return nil, false
}
return entry.req, true
}
// uniqueToken generates a random URL-safe token and ensures uniqueness.
func (c *requestCache) uniqueToken() (string, error) {
const maxTries = 10
// Number of bytes to be tokenLen when base64 encoded.
tokenSize := math.Ceil(float64(tokenLen) * 6 / 8)
rawToken := make([]byte, int(tokenSize))
for i := 0; i < maxTries; i++ {
if _, err := rand.Read(rawToken); err != nil {
return "", err
}
encoded := base64.RawURLEncoding.EncodeToString(rawToken)
token := encoded[:tokenLen]
// If it's unique, return it. Otherwise retry.
if _, exists := c.tokens[encoded]; !exists {
return token, nil
}
}
return "", fmt.Errorf("failed to generate unique token")
}
// Must be write-locked prior to calling.
func (c *requestCache) gc() {
now := c.clock.Now()
for c.ll.Len() > 0 {
oldest := c.ll.Back()
entry := oldest.Value.(*cacheEntry)
if !now.After(entry.expireTime) {
return
}
// Oldest value is expired; remove it.
c.ll.Remove(oldest)
delete(c.tokens, entry.token)
}
}

View File

@@ -0,0 +1,399 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package streaming
import (
"crypto/tls"
"errors"
"io"
"net"
"net/http"
"net/url"
"path"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
restful "github.com/emicklei/go-restful"
"k8s.io/apimachinery/pkg/types"
remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/client-go/tools/remotecommand"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"github.com/containerd/cri/pkg/streaming/portforward"
remotecommandserver "github.com/containerd/cri/pkg/streaming/remotecommand"
)
// Server is the library interface to serve the stream requests.
type Server interface {
http.Handler
// Get the serving URL for the requests.
// Requests must not be nil. Responses may be nil iff an error is returned.
GetExec(*runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error)
GetAttach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error)
GetPortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error)
// Start the server.
// addr is the address to serve on (address:port) stayUp indicates whether the server should
// listen until Stop() is called, or automatically stop after all expected connections are
// closed. Calling Get{Exec,Attach,PortForward} increments the expected connection count.
// Function does not return until the server is stopped.
Start(stayUp bool) error
// Stop the server, and terminate any open connections.
Stop() error
}
// Runtime is the interface to execute the commands and provide the streams.
type Runtime interface {
Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error
}
// Config defines the options used for running the stream server.
type Config struct {
// The host:port address the server will listen on.
Addr string
// The optional base URL for constructing streaming URLs. If empty, the baseURL will be
// constructed from the serve address.
// Note that for port "0", the URL port will be set to actual port in use.
BaseURL *url.URL
// How long to leave idle connections open for.
StreamIdleTimeout time.Duration
// How long to wait for clients to create streams. Only used for SPDY streaming.
StreamCreationTimeout time.Duration
// The streaming protocols the server supports (understands and permits). See
// k8s.io/kubernetes/pkg/kubelet/server/remotecommand/constants.go for available protocols.
// Only used for SPDY streaming.
SupportedRemoteCommandProtocols []string
// The streaming protocols the server supports (understands and permits). See
// k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go for available protocols.
// Only used for SPDY streaming.
SupportedPortForwardProtocols []string
// The config for serving over TLS. If nil, TLS will not be used.
TLSConfig *tls.Config
}
// DefaultConfig provides default values for server Config. The DefaultConfig is partial, so
// some fields like Addr must still be provided.
var DefaultConfig = Config{
StreamIdleTimeout: 4 * time.Hour,
StreamCreationTimeout: remotecommandconsts.DefaultStreamCreationTimeout,
SupportedRemoteCommandProtocols: remotecommandconsts.SupportedStreamingProtocols,
SupportedPortForwardProtocols: portforward.SupportedProtocols,
}
// NewServer creates a new Server for stream requests.
// TODO(tallclair): Add auth(n/z) interface & handling.
func NewServer(config Config, runtime Runtime) (Server, error) {
s := &server{
config: config,
runtime: &criAdapter{runtime},
cache: newRequestCache(),
}
if s.config.BaseURL == nil {
s.config.BaseURL = &url.URL{
Scheme: "http",
Host: s.config.Addr,
}
if s.config.TLSConfig != nil {
s.config.BaseURL.Scheme = "https"
}
}
ws := &restful.WebService{}
endpoints := []struct {
path string
handler restful.RouteFunction
}{
{"/exec/{token}", s.serveExec},
{"/attach/{token}", s.serveAttach},
{"/portforward/{token}", s.servePortForward},
}
// If serving relative to a base path, set that here.
pathPrefix := path.Dir(s.config.BaseURL.Path)
for _, e := range endpoints {
for _, method := range []string{"GET", "POST"} {
ws.Route(ws.
Method(method).
Path(path.Join(pathPrefix, e.path)).
To(e.handler))
}
}
handler := restful.NewContainer()
handler.Add(ws)
s.handler = handler
s.server = &http.Server{
Addr: s.config.Addr,
Handler: s.handler,
TLSConfig: s.config.TLSConfig,
}
return s, nil
}
type server struct {
config Config
runtime *criAdapter
handler http.Handler
cache *requestCache
server *http.Server
}
func validateExecRequest(req *runtimeapi.ExecRequest) error {
if req.ContainerId == "" {
return status.Errorf(codes.InvalidArgument, "missing required container_id")
}
if req.Tty && req.Stderr {
// If TTY is set, stderr cannot be true because multiplexing is not
// supported.
return status.Errorf(codes.InvalidArgument, "tty and stderr cannot both be true")
}
if !req.Stdin && !req.Stdout && !req.Stderr {
return status.Errorf(codes.InvalidArgument, "one of stdin, stdout, or stderr must be set")
}
return nil
}
func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
if err := validateExecRequest(req); err != nil {
return nil, err
}
token, err := s.cache.Insert(req)
if err != nil {
return nil, err
}
return &runtimeapi.ExecResponse{
Url: s.buildURL("exec", token),
}, nil
}
func validateAttachRequest(req *runtimeapi.AttachRequest) error {
if req.ContainerId == "" {
return status.Errorf(codes.InvalidArgument, "missing required container_id")
}
if req.Tty && req.Stderr {
// If TTY is set, stderr cannot be true because multiplexing is not
// supported.
return status.Errorf(codes.InvalidArgument, "tty and stderr cannot both be true")
}
if !req.Stdin && !req.Stdout && !req.Stderr {
return status.Errorf(codes.InvalidArgument, "one of stdin, stdout, and stderr must be set")
}
return nil
}
func (s *server) GetAttach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
if err := validateAttachRequest(req); err != nil {
return nil, err
}
token, err := s.cache.Insert(req)
if err != nil {
return nil, err
}
return &runtimeapi.AttachResponse{
Url: s.buildURL("attach", token),
}, nil
}
func (s *server) GetPortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
if req.PodSandboxId == "" {
return nil, status.Errorf(codes.InvalidArgument, "missing required pod_sandbox_id")
}
token, err := s.cache.Insert(req)
if err != nil {
return nil, err
}
return &runtimeapi.PortForwardResponse{
Url: s.buildURL("portforward", token),
}, nil
}
func (s *server) Start(stayUp bool) error {
if !stayUp {
// TODO(tallclair): Implement this.
return errors.New("stayUp=false is not yet implemented")
}
listener, err := net.Listen("tcp", s.config.Addr)
if err != nil {
return err
}
// Use the actual address as baseURL host. This handles the "0" port case.
s.config.BaseURL.Host = listener.Addr().String()
if s.config.TLSConfig != nil {
return s.server.ServeTLS(listener, "", "") // Use certs from TLSConfig.
}
return s.server.Serve(listener)
}
func (s *server) Stop() error {
return s.server.Close()
}
func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handler.ServeHTTP(w, r)
}
func (s *server) buildURL(method, token string) string {
return s.config.BaseURL.ResolveReference(&url.URL{
Path: path.Join(method, token),
}).String()
}
func (s *server) serveExec(req *restful.Request, resp *restful.Response) {
token := req.PathParameter("token")
cachedRequest, ok := s.cache.Consume(token)
if !ok {
http.NotFound(resp.ResponseWriter, req.Request)
return
}
exec, ok := cachedRequest.(*runtimeapi.ExecRequest)
if !ok {
http.NotFound(resp.ResponseWriter, req.Request)
return
}
streamOpts := &remotecommandserver.Options{
Stdin: exec.Stdin,
Stdout: exec.Stdout,
Stderr: exec.Stderr,
TTY: exec.Tty,
}
remotecommandserver.ServeExec(
resp.ResponseWriter,
req.Request,
s.runtime,
"", // unused: podName
"", // unusued: podUID
exec.ContainerId,
exec.Cmd,
streamOpts,
s.config.StreamIdleTimeout,
s.config.StreamCreationTimeout,
s.config.SupportedRemoteCommandProtocols)
}
func (s *server) serveAttach(req *restful.Request, resp *restful.Response) {
token := req.PathParameter("token")
cachedRequest, ok := s.cache.Consume(token)
if !ok {
http.NotFound(resp.ResponseWriter, req.Request)
return
}
attach, ok := cachedRequest.(*runtimeapi.AttachRequest)
if !ok {
http.NotFound(resp.ResponseWriter, req.Request)
return
}
streamOpts := &remotecommandserver.Options{
Stdin: attach.Stdin,
Stdout: attach.Stdout,
Stderr: attach.Stderr,
TTY: attach.Tty,
}
remotecommandserver.ServeAttach(
resp.ResponseWriter,
req.Request,
s.runtime,
"", // unused: podName
"", // unusued: podUID
attach.ContainerId,
streamOpts,
s.config.StreamIdleTimeout,
s.config.StreamCreationTimeout,
s.config.SupportedRemoteCommandProtocols)
}
func (s *server) servePortForward(req *restful.Request, resp *restful.Response) {
token := req.PathParameter("token")
cachedRequest, ok := s.cache.Consume(token)
if !ok {
http.NotFound(resp.ResponseWriter, req.Request)
return
}
pf, ok := cachedRequest.(*runtimeapi.PortForwardRequest)
if !ok {
http.NotFound(resp.ResponseWriter, req.Request)
return
}
portForwardOptions, err := portforward.BuildV4Options(pf.Port)
if err != nil {
resp.WriteError(http.StatusBadRequest, err)
return
}
portforward.ServePortForward(
resp.ResponseWriter,
req.Request,
s.runtime,
pf.PodSandboxId,
"", // unused: podUID
portForwardOptions,
s.config.StreamIdleTimeout,
s.config.StreamCreationTimeout,
s.config.SupportedPortForwardProtocols)
}
// criAdapter wraps the Runtime functions to conform to the remotecommand interfaces.
// The adapter binds the container ID to the container name argument, and the pod sandbox ID to the pod name.
type criAdapter struct {
Runtime
}
var _ remotecommandserver.Executor = &criAdapter{}
var _ remotecommandserver.Attacher = &criAdapter{}
var _ portforward.PortForwarder = &criAdapter{}
func (a *criAdapter) ExecInContainer(podName string, podUID types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
return a.Runtime.Exec(container, cmd, in, out, err, tty, resize)
}
func (a *criAdapter) AttachContainer(podName string, podUID types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
return a.Runtime.Attach(container, in, out, err, tty, resize)
}
func (a *criAdapter) PortForward(podName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error {
return a.Runtime.PortForward(podName, port, stream)
}

View File

@@ -1,13 +1,13 @@
# cri dependencies
github.com/docker/docker 4634ce647cf2ce2c6031129ccd109e557244986f
github.com/opencontainers/selinux bb88c45a3863dc4c38320d71b890bb30ef9feba4
github.com/opencontainers/selinux v1.5.1
github.com/tchap/go-patricia v2.2.6
# containerd dependencies
github.com/beorn7/perks v1.0.1
github.com/BurntSushi/toml v0.3.1
github.com/cespare/xxhash/v2 v2.1.1
github.com/containerd/cgroups b4448137398923af7f4918b8b2ad8249172ca7a6
github.com/containerd/cgroups e9676da73eddf8ed2433f77aaf3b9cf8f0f75b8c
github.com/containerd/console v1.0.0
github.com/containerd/containerd v1.4.0-beta.0
github.com/containerd/continuity d3ef23f19fbb106bb73ffde425d07a9187e30745
@@ -16,14 +16,14 @@ github.com/containerd/go-runc 7016d3ce2328dd2cb1192b2076eb
github.com/containerd/ttrpc v1.0.1
github.com/containerd/typeurl v1.0.1
github.com/coreos/go-systemd/v22 v22.0.0
github.com/cpuguy83/go-md2man v1.0.10
github.com/cpuguy83/go-md2man/v2 v2.0.0
github.com/docker/go-events e31b211e4f1cd09aa76fe4ac244571fab96ae47f
github.com/docker/go-metrics v0.0.1
github.com/docker/go-units v0.4.0
github.com/godbus/dbus/v5 v5.0.3
github.com/gogo/googleapis v1.3.2
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.3
github.com/golang/protobuf v1.3.5
github.com/google/uuid v1.1.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/errwrap v1.0.0
@@ -36,53 +36,55 @@ github.com/Microsoft/go-winio v0.4.14
github.com/Microsoft/hcsshim v0.8.9
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.0.1
github.com/opencontainers/runc v1.0.0-rc10
github.com/opencontainers/runtime-spec v1.0.2
github.com/opencontainers/runc v1.0.0-rc91
github.com/opencontainers/runtime-spec 237cc4f519e2e8f9b235bacccfa8ef5a84df2875 # v1.0.2-14-g8e2f17c
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.3.0
github.com/prometheus/client_model v0.1.0
github.com/prometheus/common v0.7.0
github.com/prometheus/procfs v0.0.8
github.com/russross/blackfriday v1.5.2
github.com/prometheus/client_golang v1.6.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
github.com/prometheus/procfs v0.0.11
github.com/russross/blackfriday/v2 v2.0.1
github.com/shurcooL/sanitized_anchor_name v1.0.0
github.com/sirupsen/logrus v1.6.0
github.com/syndtr/gocapability d98352740cb2c55f81556b63d4a1ec64c5a319c2
github.com/urfave/cli v1.22.0
github.com/urfave/cli v1.22.1 # NOTE: urfave/cli must be <= v1.22.1 due to a regression: https://github.com/urfave/cli/issues/1092
go.etcd.io/bbolt v1.3.3
go.opencensus.io v0.22.0
golang.org/x/net f3200d17e092c607f615320ecaad13d87ad9a2b3
golang.org/x/sync 42b317875d0fa942474b76e1b46a6060d720ae6e
golang.org/x/sys 5c8b2ff67527cb88b770f693cebf3799036d8bc0
golang.org/x/sys 9dae0f8f577553e0f21298e18926efc9644c281d
golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4
google.golang.org/genproto e50cd9704f63023d62cd06a1994b98227fc4d21a
google.golang.org/grpc v1.27.1
# cgroups dependencies
github.com/cilium/ebpf 4032b1d8aae306b7bb94a2a11002932caf88c644
github.com/cilium/ebpf 1c8d4c9ef7759622653a1d319284a44652333b28
# kubernetes dependencies
github.com/davecgh/go-spew v1.1.1
github.com/docker/spdystream 449fdfce4d962303d702fec724ef0ad181c92528
github.com/emicklei/go-restful v2.9.5
github.com/go-logr/logr v0.2.0
github.com/google/gofuzz v1.1.0
github.com/json-iterator/go v1.1.8
github.com/json-iterator/go v1.1.9
github.com/modern-go/concurrent 1.0.3
github.com/modern-go/reflect2 v1.0.1
github.com/pmezard/go-difflib v1.0.0
github.com/seccomp/libseccomp-golang v0.9.1
github.com/stretchr/testify v1.4.0
golang.org/x/crypto bac4c82f69751a6dd76e702d54b3ceb88adab236
golang.org/x/oauth2 0f29369cfe4552d0e4bcddc57cc75f4d7e672a33
golang.org/x/time 9d24e82272b4f38b78bc8cff74fa936d31ccd8ef
golang.org/x/oauth2 858c2ad4c8b6c5d10852cb89079f6ca1c7309787
golang.org/x/time 555d28b269f0569763d25dbe1a237ae74c6bcc82
gopkg.in/inf.v0 v0.9.1
gopkg.in/yaml.v2 v2.2.8
k8s.io/api v0.18.2
k8s.io/apimachinery v0.18.2
k8s.io/apiserver v0.18.2
k8s.io/client-go v0.18.2
k8s.io/cri-api v0.18.2
k8s.io/klog v1.0.0
k8s.io/kubernetes v1.18.2
k8s.io/utils a9aa75ae1b89e1b992c33383f48e942d97e52dae
k8s.io/api v0.19.0-beta.2
k8s.io/apiserver v0.19.0-beta.2
k8s.io/apimachinery v0.19.0-beta.2
k8s.io/client-go v0.19.0-beta.2
k8s.io/component-base v0.19.0-beta.2
k8s.io/cri-api v0.19.0-beta.2
k8s.io/klog/v2 v2.2.0
k8s.io/utils 2df71ebbae66f39338aed4cd0bb82d2212ee33cc
sigs.k8s.io/structured-merge-diff/v3 v3.0.0
sigs.k8s.io/yaml v1.2.0
@@ -90,7 +92,7 @@ sigs.k8s.io/yaml v1.2.0
github.com/containerd/go-cni v1.0.0
github.com/containernetworking/cni v0.7.1
github.com/containernetworking/plugins v0.7.6
github.com/fsnotify/fsnotify v1.4.8
github.com/fsnotify/fsnotify v1.4.9
# image decrypt depedencies
github.com/containerd/imgcrypt v1.0.1