apiservers: add synchronous shutdown mechanism on SIGTERM+INT

This commit is contained in:
Dr. Stefan Schimanski
2017-08-10 10:34:25 +02:00
parent 3537f8fa34
commit 11b25366bc
34 changed files with 324 additions and 66 deletions

View File

@@ -21,6 +21,7 @@ go_test(
deps = [
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
],
)
@@ -62,6 +63,7 @@ go_library(
"//plugin/cmd/kube-scheduler/app/options:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/logs:go_default_library",

View File

@@ -17,7 +17,6 @@ limitations under the License.
package main
import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/federation/cmd/federation-apiserver/app"
"k8s.io/kubernetes/federation/cmd/federation-apiserver/app/options"
)
@@ -30,9 +29,10 @@ func NewFederationAPIServer() *Server {
hks := Server{
SimpleUsage: "federation-apiserver",
Long: "The API entrypoint for the federation control plane",
Run: func(_ *Server, args []string) error {
return app.Run(s, wait.NeverStop)
Run: func(_ *Server, args []string, stopCh <-chan struct{}) error {
return app.Run(s, stopCh)
},
RespectsStopCh: true,
}
s.AddFlags(hks.Flags())
return &hks

View File

@@ -29,7 +29,7 @@ func NewFederationCMServer() *Server {
hks := Server{
SimpleUsage: "federation-controller-manager",
Long: "Controller manager for federation control plane. Manages federation service endpoints and controllers",
Run: func(_ *Server, args []string) error {
Run: func(_ *Server, args []string, stopCh <-chan struct{}) error {
return app.Run(s)
},
}

View File

@@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors.
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -25,12 +25,14 @@ import (
"os"
"path"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server"
utilflag "k8s.io/apiserver/pkg/util/flag"
"k8s.io/apiserver/pkg/util/logs"
utiltemplate "k8s.io/kubernetes/pkg/util/template"
"k8s.io/kubernetes/pkg/version/verflag"
"github.com/spf13/pflag"
)
// HyperKube represents a single binary that can morph/manage into multiple
@@ -115,7 +117,7 @@ func (hk *HyperKube) Printf(format string, i ...interface{}) {
}
// Run the server. This will pick the appropriate server and run it.
func (hk *HyperKube) Run(args []string) error {
func (hk *HyperKube) Run(args []string, stopCh <-chan struct{}) error {
// If we are called directly, parse all flags up to the first real
// argument. That should be the server to run.
command := args[0]
@@ -174,7 +176,22 @@ func (hk *HyperKube) Run(args []string) error {
logs.InitLogs()
defer logs.FlushLogs()
err = s.Run(s, s.Flags().Args())
if !s.RespectsStopCh {
// For commands that do not respect the stopCh, we run them in a go
// routine and leave them running when stopCh is closed.
errCh := make(chan error)
go func() {
errCh <- s.Run(s, s.Flags().Args(), wait.NeverStop)
}()
select {
case <-stopCh:
return errors.New("interrupted") // This error text is ignored.
case err = <-errCh:
// fall-through
}
} else {
err = s.Run(s, s.Flags().Args(), stopCh)
}
if err != nil {
hk.Println("Error:", err)
}
@@ -184,11 +201,10 @@ func (hk *HyperKube) Run(args []string) error {
// RunToExit will run the hyperkube and then call os.Exit with an appropriate exit code.
func (hk *HyperKube) RunToExit(args []string) {
err := hk.Run(args)
if err != nil {
stopCh := server.SetupSignalHandler()
if err := hk.Run(args, stopCh); err != nil {
os.Exit(1)
}
os.Exit(0)
}
// Usage will write out a summary for all servers that this binary supports.

View File

@@ -22,9 +22,12 @@ import (
"fmt"
"strings"
"testing"
"time"
"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/wait"
)
type result struct {
@@ -36,7 +39,7 @@ func testServer(n string) *Server {
return &Server{
SimpleUsage: n,
Long: fmt.Sprintf("A simple server named %s", n),
Run: func(s *Server, args []string) error {
Run: func(s *Server, args []string, stopCh <-chan struct{}) error {
s.hk.Printf("%s Run\n", s.Name())
return nil
},
@@ -46,12 +49,47 @@ func testServerError(n string) *Server {
return &Server{
SimpleUsage: n,
Long: fmt.Sprintf("A simple server named %s that returns an error", n),
Run: func(s *Server, args []string) error {
Run: func(s *Server, args []string, stopCh <-chan struct{}) error {
s.hk.Printf("%s Run\n", s.Name())
return errors.New("server returning error")
},
}
}
func testStopChRespectingServer(n string) *Server {
return &Server{
SimpleUsage: n,
Long: fmt.Sprintf("A simple server named %s", n),
Run: func(s *Server, args []string, stopCh <-chan struct{}) error {
s.hk.Printf("%s Run\n", s.Name())
<-stopCh
return nil
},
RespectsStopCh: true,
}
}
func testStopChIgnoringServer(n string) *Server {
return &Server{
SimpleUsage: n,
Long: fmt.Sprintf("A simple server named %s", n),
Run: func(s *Server, args []string, stopCh <-chan struct{}) error {
<-wait.NeverStop // this leaks obviously, but we don't care about one go routine more or less in test
return nil
},
RespectsStopCh: false,
}
}
func testStopChRespectingServerWithError(n string) *Server {
return &Server{
SimpleUsage: n,
Long: fmt.Sprintf("A simple server named %s", n),
Run: func(s *Server, args []string, stopCh <-chan struct{}) error {
s.hk.Printf("%s Run\n", s.Name())
<-stopCh
return errors.New("server returning error")
},
RespectsStopCh: true,
}
}
const defaultCobraMessage = "default message from cobra command"
const defaultCobraSubMessage = "default sub-message from cobra command"
@@ -91,7 +129,7 @@ func testCobraCommand(n string) *Server {
s := &Server{
SimpleUsage: n,
Long: fmt.Sprintf("A server named %s which uses a cobra command", n),
Run: func(s *Server, args []string) error {
Run: func(s *Server, args []string, stopCh <-chan struct{}) error {
cobraServer = s
cmd.SetOutput(s.hk.Out())
cmd.SetArgs(args)
@@ -102,7 +140,7 @@ func testCobraCommand(n string) *Server {
return s
}
func runFull(t *testing.T, args string) *result {
func runFull(t *testing.T, args string, stopCh <-chan struct{}) *result {
buf := new(bytes.Buffer)
hk := HyperKube{
Name: "hyperkube",
@@ -114,11 +152,14 @@ func runFull(t *testing.T, args string) *result {
hk.AddServer(testServer("test2"))
hk.AddServer(testServer("test3"))
hk.AddServer(testServerError("test-error"))
hk.AddServer(testStopChIgnoringServer("test-stop-ch-ignoring"))
hk.AddServer(testStopChRespectingServer("test-stop-ch-respecting"))
hk.AddServer(testStopChRespectingServerWithError("test-error-stop-ch-respecting"))
hk.AddServer(testCobraCommand("test-cobra-command"))
a := strings.Split(args, " ")
t.Logf("Running full with args: %q", a)
err := hk.Run(a)
err := hk.Run(a, stopCh)
r := &result{err, buf.String()}
t.Logf("Result err: %v, output: %q", r.err, r.output)
@@ -127,37 +168,37 @@ func runFull(t *testing.T, args string) *result {
}
func TestRun(t *testing.T) {
x := runFull(t, "hyperkube test1")
x := runFull(t, "hyperkube test1", wait.NeverStop)
assert.Contains(t, x.output, "test1 Run")
assert.NoError(t, x.err)
}
func TestLinkRun(t *testing.T) {
x := runFull(t, "test1")
x := runFull(t, "test1", wait.NeverStop)
assert.Contains(t, x.output, "test1 Run")
assert.NoError(t, x.err)
}
func TestTopNoArgs(t *testing.T) {
x := runFull(t, "hyperkube")
x := runFull(t, "hyperkube", wait.NeverStop)
assert.EqualError(t, x.err, "no server specified")
}
func TestBadServer(t *testing.T) {
x := runFull(t, "hyperkube bad-server")
x := runFull(t, "hyperkube bad-server", wait.NeverStop)
assert.EqualError(t, x.err, "Server not found: bad-server")
assert.Contains(t, x.output, "Usage")
}
func TestTopHelp(t *testing.T) {
x := runFull(t, "hyperkube --help")
x := runFull(t, "hyperkube --help", wait.NeverStop)
assert.NoError(t, x.err)
assert.Contains(t, x.output, "all-in-one")
assert.Contains(t, x.output, "A simple server named test1")
}
func TestTopFlags(t *testing.T) {
x := runFull(t, "hyperkube --help test1")
x := runFull(t, "hyperkube --help test1", wait.NeverStop)
assert.NoError(t, x.err)
assert.Contains(t, x.output, "all-in-one")
assert.Contains(t, x.output, "A simple server named test1")
@@ -165,14 +206,14 @@ func TestTopFlags(t *testing.T) {
}
func TestTopFlagsBad(t *testing.T) {
x := runFull(t, "hyperkube --bad-flag")
x := runFull(t, "hyperkube --bad-flag", wait.NeverStop)
assert.EqualError(t, x.err, "unknown flag: --bad-flag")
assert.Contains(t, x.output, "all-in-one")
assert.Contains(t, x.output, "A simple server named test1")
}
func TestServerHelp(t *testing.T) {
x := runFull(t, "hyperkube test1 --help")
x := runFull(t, "hyperkube test1 --help", wait.NeverStop)
assert.NoError(t, x.err)
assert.Contains(t, x.output, "A simple server named test1")
assert.Contains(t, x.output, "-h, --help")
@@ -181,7 +222,7 @@ func TestServerHelp(t *testing.T) {
}
func TestServerFlagsBad(t *testing.T) {
x := runFull(t, "hyperkube test1 --bad-flag")
x := runFull(t, "hyperkube test1 --bad-flag", wait.NeverStop)
assert.EqualError(t, x.err, "unknown flag: --bad-flag")
assert.Contains(t, x.output, "A simple server named test1")
assert.Contains(t, x.output, "-h, --help")
@@ -190,36 +231,91 @@ func TestServerFlagsBad(t *testing.T) {
}
func TestServerError(t *testing.T) {
x := runFull(t, "hyperkube test-error")
x := runFull(t, "hyperkube test-error", wait.NeverStop)
assert.Contains(t, x.output, "test-error Run")
assert.EqualError(t, x.err, "server returning error")
}
func TestStopChIgnoringServer(t *testing.T) {
stopCh := make(chan struct{})
returnedCh := make(chan struct{})
var x *result
go func() {
defer close(returnedCh)
x = runFull(t, "hyperkube test-stop-ch-ignoring", stopCh)
}()
close(stopCh)
select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("%q never returned after stopCh was closed", "hyperkube test-stop-ch-ignoring")
case <-returnedCh:
}
// we cannot be sure that the server had a chance to output anything
// assert.Contains(t, x.output, "test-error-stop-ch-ignoring Run")
assert.EqualError(t, x.err, "interrupted")
}
func TestStopChRespectingServer(t *testing.T) {
stopCh := make(chan struct{})
returnedCh := make(chan struct{})
var x *result
go func() {
defer close(returnedCh)
x = runFull(t, "hyperkube test-stop-ch-respecting", stopCh)
}()
close(stopCh)
select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("%q never returned after stopCh was closed", "hyperkube test-stop-ch-respecting")
case <-returnedCh:
}
assert.Contains(t, x.output, "test-stop-ch-respecting Run")
assert.Nil(t, x.err)
}
func TestStopChRespectingServerWithError(t *testing.T) {
stopCh := make(chan struct{})
returnedCh := make(chan struct{})
var x *result
go func() {
defer close(returnedCh)
x = runFull(t, "hyperkube test-error-stop-ch-respecting", stopCh)
}()
close(stopCh)
select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("%q never returned after stopCh was closed", "hyperkube test-error-stop-ch-respecting")
case <-returnedCh:
}
assert.Contains(t, x.output, "test-error-stop-ch-respecting Run")
assert.EqualError(t, x.err, "server returning error")
}
func TestCobraCommandHelp(t *testing.T) {
x := runFull(t, "hyperkube test-cobra-command --help")
x := runFull(t, "hyperkube test-cobra-command --help", wait.NeverStop)
assert.NoError(t, x.err)
assert.Contains(t, x.output, "A server named test-cobra-command which uses a cobra command")
assert.Contains(t, x.output, cobraMessageDesc)
}
func TestCobraCommandDefaultMessage(t *testing.T) {
x := runFull(t, "hyperkube test-cobra-command")
x := runFull(t, "hyperkube test-cobra-command", wait.NeverStop)
assert.Contains(t, x.output, fmt.Sprintf("msg: %s", defaultCobraMessage))
}
func TestCobraCommandMessage(t *testing.T) {
x := runFull(t, "hyperkube test-cobra-command --msg foobar")
x := runFull(t, "hyperkube test-cobra-command --msg foobar", wait.NeverStop)
assert.Contains(t, x.output, "msg: foobar")
}
func TestCobraSubCommandHelp(t *testing.T) {
x := runFull(t, "hyperkube test-cobra-command subcommand --help")
x := runFull(t, "hyperkube test-cobra-command subcommand --help", wait.NeverStop)
assert.NoError(t, x.err)
assert.Contains(t, x.output, cobraSubMessageDesc)
}
func TestCobraSubCommandDefaultMessage(t *testing.T) {
x := runFull(t, "hyperkube test-cobra-command subcommand")
x := runFull(t, "hyperkube test-cobra-command subcommand", wait.NeverStop)
assert.Contains(t, x.output, fmt.Sprintf("submsg: %s", defaultCobraSubMessage))
}
func TestCobraSubCommandMessage(t *testing.T) {
x := runFull(t, "hyperkube test-cobra-command subcommand --submsg foobar")
x := runFull(t, "hyperkube test-cobra-command subcommand --submsg foobar", wait.NeverStop)
assert.Contains(t, x.output, "submsg: foobar")
}

View File

@@ -19,7 +19,6 @@ package main
import (
"os"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kube-aggregator/pkg/cmd/server"
)
@@ -33,18 +32,19 @@ func NewKubeAggregator() *Server {
AlternativeName: "kube-aggregator",
SimpleUsage: "aggregator",
Long: "Aggregator for Kubernetes-style API servers: dynamic registration, discovery summarization, secure proxy.",
Run: func(_ *Server, args []string) error {
Run: func(_ *Server, args []string, stopCh <-chan struct{}) error {
if err := o.Complete(); err != nil {
return err
}
if err := o.Validate(args); err != nil {
return err
}
if err := o.RunAggregator(wait.NeverStop); err != nil {
if err := o.RunAggregator(stopCh); err != nil {
return err
}
return nil
},
RespectsStopCh: true,
}
o.AddFlags(hks.Flags())

View File

@@ -17,7 +17,6 @@ limitations under the License.
package main
import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
)
@@ -32,9 +31,10 @@ func NewKubeAPIServer() *Server {
AlternativeName: "kube-apiserver",
SimpleUsage: "apiserver",
Long: "The main API entrypoint and interface to the storage system. The API server is also the focal point for all authorization decisions.",
Run: func(_ *Server, args []string) error {
return app.Run(s, wait.NeverStop)
Run: func(_ *Server, args []string, stopCh <-chan struct{}) error {
return app.Run(s, stopCh)
},
RespectsStopCh: true,
}
s.AddFlags(hks.Flags())
return &hks

View File

@@ -31,7 +31,7 @@ func NewKubeControllerManager() *Server {
AlternativeName: "kube-controller-manager",
SimpleUsage: "controller-manager",
Long: "A server that runs a set of active components. This includes replication controllers, service endpoints and nodes.",
Run: func(_ *Server, args []string) error {
Run: func(_ *Server, args []string, stopCh <-chan struct{}) error {
return app.Run(s)
},
}

View File

@@ -45,7 +45,7 @@ func NewKubeProxy() *Server {
// refactored to use cobra throughout.
command.Flags().AddGoFlagSet(flag.CommandLine)
hks.Run = func(_ *Server, args []string) error {
hks.Run = func(_ *Server, args []string, stopCh <-chan struct{}) error {
command.SetArgs(args)
return command.Execute()
}

View File

@@ -31,7 +31,7 @@ func NewScheduler() *Server {
AlternativeName: "kube-scheduler",
SimpleUsage: "scheduler",
Long: "Implements a Kubernetes scheduler. This will assign pods to kubelets based on capacity and constraints.",
Run: func(_ *Server, _ []string) error {
Run: func(_ *Server, _ []string, stopCh <-chan struct{}) error {
return app.Run(s)
},
}

View File

@@ -33,7 +33,7 @@ func NewKubectlServer() *Server {
name: "kubectl",
SimpleUsage: "Kubernetes command line client",
Long: "Kubernetes command line client",
Run: func(s *Server, args []string) error {
Run: func(s *Server, args []string, stopCh <-chan struct{}) error {
cmd.SetArgs(args)
return cmd.Execute()
},

View File

@@ -38,7 +38,7 @@ func NewKubelet() (*Server, error) {
queries Docker to see what is currently running. It synchronizes the
configuration data, with the running set of containers by starting or stopping
Docker containers.`,
Run: func(_ *Server, _ []string) error {
Run: func(_ *Server, _ []string, stopCh <-chan struct{}) error {
return app.Run(s, nil)
},
}

View File

@@ -26,7 +26,7 @@ import (
"github.com/spf13/pflag"
)
type serverRunFunc func(s *Server, args []string) error
type serverRunFunc func(s *Server, args []string, stopCh <-chan struct{}) error
// Server describes a server that this binary can morph into.
type Server struct {
@@ -34,6 +34,7 @@ type Server struct {
Long string // Longer free form description of the server
Run serverRunFunc // Run the server. This is not expected to return.
AlternativeName string
RespectsStopCh bool
flags *pflag.FlagSet // Flags for the command (and all dependents)
name string