Merge pull request #3459 from crosbymichael/timeout-config
Allow timeouts to be configured in config
This commit is contained in:
commit
fc9335d75c
@ -22,6 +22,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/BurntSushi/toml"
|
"github.com/BurntSushi/toml"
|
||||||
|
"github.com/containerd/containerd/pkg/timeout"
|
||||||
"github.com/containerd/containerd/services/server"
|
"github.com/containerd/containerd/services/server"
|
||||||
srvconfig "github.com/containerd/containerd/services/server/config"
|
srvconfig "github.com/containerd/containerd/services/server/config"
|
||||||
"github.com/urfave/cli"
|
"github.com/urfave/cli"
|
||||||
@ -68,6 +69,12 @@ var configCommand = cli.Command{
|
|||||||
config.Plugins[p.URI()] = p.Config
|
config.Plugins[p.URI()] = p.Config
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
timeouts := timeout.All()
|
||||||
|
config.Timeouts = make(map[string]string)
|
||||||
|
for k, v := range timeouts {
|
||||||
|
config.Timeouts[k] = v.String()
|
||||||
|
}
|
||||||
|
|
||||||
_, err = config.WriteTo(os.Stdout)
|
_, err = config.WriteTo(os.Stdout)
|
||||||
return err
|
return err
|
||||||
},
|
},
|
||||||
|
66
pkg/timeout/timeout.go
Normal file
66
pkg/timeout/timeout.go
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
/*
|
||||||
|
Copyright The containerd Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package timeout
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
mu sync.Mutex
|
||||||
|
timeouts = make(map[string]time.Duration)
|
||||||
|
|
||||||
|
// DefaultTimeout of the timeout package
|
||||||
|
DefaultTimeout = 1 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// Set the timeout for the key
|
||||||
|
func Set(key string, t time.Duration) {
|
||||||
|
mu.Lock()
|
||||||
|
timeouts[key] = t
|
||||||
|
mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns the timeout for the provided key
|
||||||
|
func Get(key string) time.Duration {
|
||||||
|
mu.Lock()
|
||||||
|
t, ok := timeouts[key]
|
||||||
|
mu.Unlock()
|
||||||
|
if !ok {
|
||||||
|
t = DefaultTimeout
|
||||||
|
}
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithContext returns a context with the specified timeout for the provided key
|
||||||
|
func WithContext(ctx context.Context, key string) (context.Context, func()) {
|
||||||
|
t := Get(key)
|
||||||
|
return context.WithTimeout(ctx, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// All returns all keys and their timeouts
|
||||||
|
func All() map[string]time.Duration {
|
||||||
|
out := make(map[string]time.Duration)
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
for k, v := range timeouts {
|
||||||
|
out[k] = v
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
@ -32,6 +32,7 @@ import (
|
|||||||
"github.com/containerd/containerd/identifiers"
|
"github.com/containerd/containerd/identifiers"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
"github.com/containerd/containerd/namespaces"
|
"github.com/containerd/containerd/namespaces"
|
||||||
|
"github.com/containerd/containerd/pkg/timeout"
|
||||||
"github.com/containerd/containerd/runtime"
|
"github.com/containerd/containerd/runtime"
|
||||||
client "github.com/containerd/containerd/runtime/v2/shim"
|
client "github.com/containerd/containerd/runtime/v2/shim"
|
||||||
"github.com/containerd/containerd/runtime/v2/task"
|
"github.com/containerd/containerd/runtime/v2/task"
|
||||||
@ -41,6 +42,18 @@ import (
|
|||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
loadTimeout = "io.containerd.timeout.shim.load"
|
||||||
|
cleanupTimeout = "io.containerd.timeout.shim.cleanup"
|
||||||
|
shutdownTimeout = "io.containerd.timeout.shim.shutdown"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
timeout.Set(loadTimeout, 5*time.Second)
|
||||||
|
timeout.Set(cleanupTimeout, 5*time.Second)
|
||||||
|
timeout.Set(shutdownTimeout, 3*time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
func loadAddress(path string) (string, error) {
|
func loadAddress(path string) (string, error) {
|
||||||
data, err := ioutil.ReadFile(path)
|
data, err := ioutil.ReadFile(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -100,7 +113,7 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
|
|||||||
events: events,
|
events: events,
|
||||||
rtTasks: rt,
|
rtTasks: rt,
|
||||||
}
|
}
|
||||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
ctx, cancel := timeout.WithContext(ctx, loadTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if err := s.Connect(ctx); err != nil {
|
if err := s.Connect(ctx); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -110,7 +123,7 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
|
|||||||
|
|
||||||
func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.Exchange, binaryCall *binary) {
|
func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.Exchange, binaryCall *binary) {
|
||||||
ctx = namespaces.WithNamespace(ctx, ns)
|
ctx = namespaces.WithNamespace(ctx, ns)
|
||||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
ctx, cancel := timeout.WithContext(ctx, cleanupTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
log.G(ctx).WithFields(logrus.Fields{
|
log.G(ctx).WithFields(logrus.Fields{
|
||||||
@ -185,7 +198,7 @@ func (s *shim) Shutdown(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *shim) waitShutdown(ctx context.Context) error {
|
func (s *shim) waitShutdown(ctx context.Context) error {
|
||||||
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
ctx, cancel := timeout.WithContext(ctx, shutdownTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
return s.Shutdown(ctx)
|
return s.Shutdown(ctx)
|
||||||
}
|
}
|
||||||
|
@ -55,6 +55,8 @@ type Config struct {
|
|||||||
Cgroup CgroupConfig `toml:"cgroup"`
|
Cgroup CgroupConfig `toml:"cgroup"`
|
||||||
// ProxyPlugins configures plugins which are communicated to over GRPC
|
// ProxyPlugins configures plugins which are communicated to over GRPC
|
||||||
ProxyPlugins map[string]ProxyPlugin `toml:"proxy_plugins"`
|
ProxyPlugins map[string]ProxyPlugin `toml:"proxy_plugins"`
|
||||||
|
// Timeouts specified as a duration
|
||||||
|
Timeouts map[string]string `toml:"timeouts"`
|
||||||
|
|
||||||
StreamProcessors []StreamProcessor `toml:"stream_processors"`
|
StreamProcessors []StreamProcessor `toml:"stream_processors"`
|
||||||
|
|
||||||
|
@ -40,6 +40,7 @@ import (
|
|||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
"github.com/containerd/containerd/metadata"
|
"github.com/containerd/containerd/metadata"
|
||||||
"github.com/containerd/containerd/pkg/dialer"
|
"github.com/containerd/containerd/pkg/dialer"
|
||||||
|
"github.com/containerd/containerd/pkg/timeout"
|
||||||
"github.com/containerd/containerd/plugin"
|
"github.com/containerd/containerd/plugin"
|
||||||
srvconfig "github.com/containerd/containerd/services/server/config"
|
srvconfig "github.com/containerd/containerd/services/server/config"
|
||||||
"github.com/containerd/containerd/snapshots"
|
"github.com/containerd/containerd/snapshots"
|
||||||
@ -77,6 +78,13 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
|
|||||||
if err := apply(ctx, config); err != nil {
|
if err := apply(ctx, config); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
for key, sec := range config.Timeouts {
|
||||||
|
d, err := time.ParseDuration(sec)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Errorf("unable to parse %s into a time duration", sec)
|
||||||
|
}
|
||||||
|
timeout.Set(key, d)
|
||||||
|
}
|
||||||
plugins, err := LoadPlugins(ctx, config)
|
plugins, err := LoadPlugins(ctx, config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -40,6 +40,7 @@ import (
|
|||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
"github.com/containerd/containerd/metadata"
|
"github.com/containerd/containerd/metadata"
|
||||||
"github.com/containerd/containerd/mount"
|
"github.com/containerd/containerd/mount"
|
||||||
|
"github.com/containerd/containerd/pkg/timeout"
|
||||||
"github.com/containerd/containerd/plugin"
|
"github.com/containerd/containerd/plugin"
|
||||||
"github.com/containerd/containerd/runtime"
|
"github.com/containerd/containerd/runtime"
|
||||||
"github.com/containerd/containerd/runtime/linux/runctypes"
|
"github.com/containerd/containerd/runtime/linux/runctypes"
|
||||||
@ -61,6 +62,10 @@ var (
|
|||||||
empty = &ptypes.Empty{}
|
empty = &ptypes.Empty{}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
stateTimeout = "io.containerd.timeout.task.state"
|
||||||
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
plugin.Register(&plugin.Registration{
|
plugin.Register(&plugin.Registration{
|
||||||
Type: plugin.ServicePlugin,
|
Type: plugin.ServicePlugin,
|
||||||
@ -68,6 +73,8 @@ func init() {
|
|||||||
Requires: tasksServiceRequires,
|
Requires: tasksServiceRequires,
|
||||||
InitFn: initFunc,
|
InitFn: initFunc,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
timeout.Set(stateTimeout, 2*time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
func initFunc(ic *plugin.InitContext) (interface{}, error) {
|
func initFunc(ic *plugin.InitContext) (interface{}, error) {
|
||||||
@ -266,7 +273,7 @@ func (l *local) DeleteProcess(ctx context.Context, r *api.DeleteProcessRequest,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func getProcessState(ctx context.Context, p runtime.Process) (*task.Process, error) {
|
func getProcessState(ctx context.Context, p runtime.Process) (*task.Process, error) {
|
||||||
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
ctx, cancel := timeout.WithContext(ctx, stateTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
state, err := p.State(ctx)
|
state, err := p.State(ctx)
|
||||||
|
Loading…
Reference in New Issue
Block a user