Support stream idle timeout.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu 2019-02-27 02:20:34 -08:00
parent fbce57903c
commit 8222da7768
5 changed files with 29 additions and 2 deletions

6
cri.go
View File

@ -19,6 +19,7 @@ package cri
import ( import (
"flag" "flag"
"path/filepath" "path/filepath"
"time"
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/api/services/containers/v1" "github.com/containerd/containerd/api/services/containers/v1"
@ -118,6 +119,11 @@ func validateConfig(c *criconfig.Config) error {
} }
} }
if c.StreamIdleTimeout != "" {
if _, err := time.ParseDuration(c.StreamIdleTimeout); err != nil {
return errors.Wrap(err, "invalid stream idle timeout")
}
}
return nil return nil
} }

View File

@ -12,6 +12,12 @@ The explanation and default value of each configuration item are as follows:
# stream_server_port is the port streaming server is listening on. # stream_server_port is the port streaming server is listening on.
stream_server_port = "0" stream_server_port = "0"
# stream_idle_timeout is the maximum time a streaming connection can be
# idle before the connection is automatically closed.
# The string is in the golang duration format, see:
# https://golang.org/pkg/time/#ParseDuration
stream_idle_timeout = "4h"
# enable_selinux indicates to enable the selinux support. # enable_selinux indicates to enable the selinux support.
enable_selinux = false enable_selinux = false

View File

@ -19,6 +19,7 @@ package config
import ( import (
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
"github.com/containerd/containerd" "github.com/containerd/containerd"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
) )
// Runtime struct to contain the type(ID), engine, and root variables for a default runtime // Runtime struct to contain the type(ID), engine, and root variables for a default runtime
@ -124,6 +125,11 @@ type PluginConfig struct {
StreamServerAddress string `toml:"stream_server_address" json:"streamServerAddress"` StreamServerAddress string `toml:"stream_server_address" json:"streamServerAddress"`
// StreamServerPort is the port streaming server is listening on. // StreamServerPort is the port streaming server is listening on.
StreamServerPort string `toml:"stream_server_port" json:"streamServerPort"` StreamServerPort string `toml:"stream_server_port" json:"streamServerPort"`
// StreamIdleTimeout is the maximum time a streaming connection
// can be idle before the connection is automatically closed.
// The string is in the golang duration format, see:
// https://golang.org/pkg/time/#ParseDuration
StreamIdleTimeout string `toml:"stream_idle_timeout" json:"streamIdleTimeout"`
// EnableSelinux indicates to enable the selinux support. // EnableSelinux indicates to enable the selinux support.
EnableSelinux bool `toml:"enable_selinux" json:"enableSelinux"` EnableSelinux bool `toml:"enable_selinux" json:"enableSelinux"`
// SandboxImage is the image used by sandbox container. // SandboxImage is the image used by sandbox container.
@ -196,6 +202,7 @@ func DefaultConfig() PluginConfig {
}, },
StreamServerAddress: "127.0.0.1", StreamServerAddress: "127.0.0.1",
StreamServerPort: "0", StreamServerPort: "0",
StreamIdleTimeout: streaming.DefaultConfig.StreamIdleTimeout.String(), // 4 hour
EnableSelinux: false, EnableSelinux: false,
EnableTLSStreaming: false, EnableTLSStreaming: false,
X509KeyPairStreaming: X509KeyPairStreaming{ X509KeyPairStreaming: X509KeyPairStreaming{

View File

@ -159,7 +159,7 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi
logrus.WithError(err).Error("Failed to load cni during init, please check CRI plugin status before setting up network for pods") logrus.WithError(err).Error("Failed to load cni during init, please check CRI plugin status before setting up network for pods")
} }
// prepare streaming server // prepare streaming server
c.streamServer, err = newStreamServer(c, config.StreamServerAddress, config.StreamServerPort) c.streamServer, err = newStreamServer(c, config.StreamServerAddress, config.StreamServerPort, config.StreamIdleTimeout)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to create stream server") return nil, errors.Wrap(err, "failed to create stream server")
} }

View File

@ -22,6 +22,7 @@ import (
"math" "math"
"net" "net"
"os" "os"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
k8snet "k8s.io/apimachinery/pkg/util/net" k8snet "k8s.io/apimachinery/pkg/util/net"
@ -64,7 +65,7 @@ func getStreamListenerMode(c *criService) (streamListenerMode, error) {
return withoutTLS, nil return withoutTLS, nil
} }
func newStreamServer(c *criService, addr, port string) (streaming.Server, error) { func newStreamServer(c *criService, addr, port, streamIdleTimeout string) (streaming.Server, error) {
if addr == "" { if addr == "" {
a, err := k8snet.ChooseBindAddress(nil) a, err := k8snet.ChooseBindAddress(nil)
if err != nil { if err != nil {
@ -73,6 +74,13 @@ func newStreamServer(c *criService, addr, port string) (streaming.Server, error)
addr = a.String() addr = a.String()
} }
config := streaming.DefaultConfig config := streaming.DefaultConfig
if streamIdleTimeout != "" {
var err error
config.StreamIdleTimeout, err = time.ParseDuration(streamIdleTimeout)
if err != nil {
return nil, errors.Wrap(err, "invalid stream idle timeout")
}
}
config.Addr = net.JoinHostPort(addr, port) config.Addr = net.JoinHostPort(addr, port)
run := newStreamRuntime(c) run := newStreamRuntime(c)
tlsMode, err := getStreamListenerMode(c) tlsMode, err := getStreamListenerMode(c)