Add iptables.Monitor, use it from kubelet and kube-proxy

Kubelet and kube-proxy both had loops to ensure that their iptables
rules didn't get deleted, by repeatedly recreating them. But on
systems with lots of iptables rules (ie, thousands of services), this
can be very slow (and thus might end up holding the iptables lock for
several seconds, blocking other operations, etc).

The specific threat that they need to worry about is
firewall-management commands that flush *all* dynamic iptables rules.
So add a new iptables.Monitor() function that handles this by creating
iptables-flush canaries and only triggering a full rule reload after
noticing that someone has deleted those chains.
This commit is contained in:
Dan Winship 2019-08-13 08:29:39 -04:00
parent b6c3d5416a
commit 3948f16ff4
9 changed files with 376 additions and 5 deletions

View File

@ -21,6 +21,7 @@ import (
"fmt"
"net"
"strings"
"time"
"k8s.io/apimachinery/pkg/util/sets"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
@ -335,6 +336,9 @@ func (f *fakeIPTables) RestoreAll(data []byte, flush utiliptables.FlushFlag, cou
return f.restore("", data, flush)
}
func (f *fakeIPTables) Monitor(canary utiliptables.Chain, tables []utiliptables.Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) {
}
func (f *fakeIPTables) isBuiltinChain(tableName utiliptables.Table, chainName utiliptables.Chain) bool {
if builtinChains, ok := f.builtinChains[string(tableName)]; ok && builtinChains.Has(string(chainName)) {
return true

View File

@ -1427,9 +1427,9 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Start loop to sync iptables util rules
// Set up iptables util rules
if kl.makeIPTablesUtilChains {
go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
kl.initNetworkUtil()
}
// Start a goroutine responsible for killing pods (that are not properly

View File

@ -20,11 +20,20 @@ package kubelet
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
)
func (kl *Kubelet) initNetworkUtil() {
kl.syncNetworkUtil()
go kl.iptClient.Monitor(utiliptables.Chain("KUBE-KUBELET-CANARY"),
[]utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
}
// syncNetworkUtil ensures the network utility are present on host.
// Network util includes:
// 1. In nat table, KUBE-MARK-DROP rule to mark connections for dropping

View File

@ -19,4 +19,4 @@ limitations under the License.
package kubelet
// Do nothing.
func (kl *Kubelet) syncNetworkUtil() {}
func (kl *Kubelet) initNetworkUtil() {}

View File

@ -323,7 +323,13 @@ func NewProxier(ipt utiliptables.Interface,
}
burstSyncs := 2
klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
// We pass syncPeriod to ipt.Monitor, which will call us only if it needs to.
// We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though.
// time.Hour is arbitrary.
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)
go ipt.Monitor(utiliptables.Chain("KUBE-PROXY-CANARY"),
[]utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
proxier.syncProxyRules, syncPeriod, wait.NeverStop)
return proxier, nil
}

View File

@ -19,13 +19,13 @@ go_library(
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/trace:go_default_library",
] + select({
"@io_bazel_rules_go//go/platform:linux": [
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/golang.org/x/sys/unix:go_default_library",
],
"//conditions:default": [],
@ -36,6 +36,7 @@ go_test(
name = "go_default_test",
srcs = [
"iptables_test.go",
"monitor_test.go",
"save_restore_test.go",
],
embed = [":go_default_library"],
@ -43,6 +44,7 @@ go_test(
"@io_bazel_rules_go//go/platform:linux": [
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/exec/testing:go_default_library",
],

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
utilversion "k8s.io/apimachinery/pkg/util/version"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
utilexec "k8s.io/utils/exec"
utiltrace "k8s.io/utils/trace"
@ -63,6 +64,17 @@ type Interface interface {
Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error
// RestoreAll is the same as Restore except that no table is specified.
RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error
// Monitor detects when the given iptables tables have been flushed by an external
// tool (e.g. a firewall reload) by creating canary chains and polling to see if
// they have been deleted. (Specifically, it polls tables[0] every interval until
// the canary has been deleted from there, then waits a short additional time for
// the canaries to be deleted from the remaining tables as well. You can optimize
// the polling by listing a relatively empty table in tables[0]). When a flush is
// detected, this calls the reloadFunc so the caller can reload their own iptables
// rules. If it is unable to create the canary chains (either initially or after
// a reload) it will log an error and stop monitoring.
// (This function should be called from a goroutine.)
Monitor(canary Chain, tables []Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{})
// HasRandomFully reveals whether `-j MASQUERADE` takes the
// `--random-fully` option. This is helpful to work around a
// Linux kernel bug that sometimes causes multiple flows to get
@ -480,12 +492,78 @@ func (runner *runner) checkRuleUsingCheck(args []string) (bool, error) {
return false, fmt.Errorf("error checking rule: %v: %s", err, out)
}
const (
// Max time we wait for an iptables flush to complete after we notice it has started
iptablesFlushTimeout = 5 * time.Second
// How often we poll while waiting for an iptables flush to complete
iptablesFlushPollTime = 100 * time.Millisecond
)
// Monitor is part of Interface
func (runner *runner) Monitor(canary Chain, tables []Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) {
for {
for _, table := range tables {
if _, err := runner.EnsureChain(table, canary); err != nil {
klog.Warningf("Could not set up iptables canary %s/%s: %v", string(table), string(canary), err)
return
}
}
// Poll until stopCh is closed or iptables is flushed
err := utilwait.PollUntil(interval, func() (bool, error) {
if runner.chainExists(tables[0], canary) {
return false, nil
}
klog.V(2).Infof("iptables canary %s/%s deleted", string(tables[0]), string(canary))
// Wait for the other canaries to be deleted too before returning
// so we don't start reloading too soon.
err := utilwait.PollImmediate(iptablesFlushPollTime, iptablesFlushTimeout, func() (bool, error) {
for i := 1; i < len(tables); i++ {
if runner.chainExists(tables[i], canary) {
return false, nil
}
}
return true, nil
})
if err != nil {
klog.Warning("Inconsistent iptables state detected.")
}
return true, nil
}, stopCh)
if err != nil {
// stopCh was closed
for _, table := range tables {
_ = runner.DeleteChain(table, canary)
}
return
}
klog.V(2).Infof("Reloading after iptables flush")
reloadFunc()
}
}
// chainExists is used internally by Monitor; none of the public Interface methods can be
// used to distinguish "chain exists" from "chain does not exist" with no side effects
func (runner *runner) chainExists(table Table, chain Chain) bool {
fullArgs := makeFullArgs(table, chain)
runner.mu.Lock()
defer runner.mu.Unlock()
_, err := runner.run(opListChain, fullArgs)
return err == nil
}
type operation string
const (
opCreateChain operation = "-N"
opFlushChain operation = "-F"
opDeleteChain operation = "-X"
opListChain operation = "-L"
opAppendRule operation = "-A"
opCheckRule operation = "-C"
opDeleteRule operation = "-D"

View File

@ -0,0 +1,268 @@
// +build linux
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package iptables
import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/sets"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/exec"
)
// We can't use the normal FakeExec because we don't know precisely how many times the
// Monitor thread will do its checks, and we don't know precisely how its iptables calls
// will interleave with the main thread's. So we use our own fake Exec implementation that
// implements a minimal iptables interface. This will need updates as iptables.runner
// changes its use of Exec.
type monitorFakeExec struct {
sync.Mutex
tables map[string]sets.String
}
func newMonitorFakeExec() *monitorFakeExec {
tables := make(map[string]sets.String)
tables["mangle"] = sets.NewString()
tables["filter"] = sets.NewString()
tables["nat"] = sets.NewString()
return &monitorFakeExec{tables: tables}
}
func (mfe *monitorFakeExec) Command(cmd string, args ...string) exec.Cmd {
return &monitorFakeCmd{mfe: mfe, cmd: cmd, args: args}
}
func (mfe *monitorFakeExec) CommandContext(ctx context.Context, cmd string, args ...string) exec.Cmd {
return mfe.Command(cmd, args...)
}
func (mfe *monitorFakeExec) LookPath(file string) (string, error) {
return file, nil
}
type monitorFakeCmd struct {
mfe *monitorFakeExec
cmd string
args []string
}
func (mfc *monitorFakeCmd) CombinedOutput() ([]byte, error) {
if mfc.cmd == cmdIPTablesRestore {
// Only used for "iptables-restore --version", and the result doesn't matter
return []byte{}, nil
} else if mfc.cmd != cmdIPTables {
panic("bad command " + mfc.cmd)
}
if len(mfc.args) == 1 && mfc.args[0] == "--version" {
return []byte("iptables v1.6.2"), nil
}
if len(mfc.args) != 6 || mfc.args[0] != WaitString || mfc.args[1] != WaitSecondsValue || mfc.args[4] != "-t" {
panic(fmt.Sprintf("bad args %#v", mfc.args))
}
op := operation(mfc.args[2])
chainName := mfc.args[3]
tableName := mfc.args[5]
mfc.mfe.Lock()
defer mfc.mfe.Unlock()
table := mfc.mfe.tables[tableName]
if table == nil {
return []byte{}, fmt.Errorf("no such table %q", tableName)
}
switch op {
case opCreateChain:
if !table.Has(chainName) {
table.Insert(chainName)
}
return []byte{}, nil
case opListChain:
if table.Has(chainName) {
return []byte{}, nil
} else {
return []byte{}, fmt.Errorf("no such chain %q", chainName)
}
case opDeleteChain:
table.Delete(chainName)
return []byte{}, nil
default:
panic("should not be reached")
}
}
func (mfc *monitorFakeCmd) SetStdin(in io.Reader) {
// Used by getIPTablesRestoreVersionString(), can be ignored
}
func (mfc *monitorFakeCmd) Run() error {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) Output() ([]byte, error) {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) SetDir(dir string) {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) SetStdout(out io.Writer) {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) SetStderr(out io.Writer) {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) SetEnv(env []string) {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) StdoutPipe() (io.ReadCloser, error) {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) StderrPipe() (io.ReadCloser, error) {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) Start() error {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) Wait() error {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) Stop() {
panic("should not be reached")
}
func TestIPTablesMonitor(t *testing.T) {
mfe := newMonitorFakeExec()
ipt := New(mfe, ProtocolIpv4)
var reloads uint32
stopCh := make(chan struct{})
canary := Chain("MONITOR-TEST-CANARY")
tables := []Table{TableMangle, TableFilter, TableNAT}
go ipt.Monitor(canary, tables, func() {
if !ensureNoChains(mfe) {
t.Errorf("reload called while canaries still exist")
}
atomic.AddUint32(&reloads, 1)
}, 100*time.Millisecond, stopCh)
// Monitor should create canary chains quickly
if err := waitForChains(mfe, canary, tables); err != nil {
t.Errorf("failed to create iptables canaries: %v", err)
}
if err := waitForReloads(&reloads, 0); err != nil {
t.Errorf("got unexpected reloads: %v", err)
}
// If we delete all of the chains, it should reload
ipt.DeleteChain(TableMangle, canary)
ipt.DeleteChain(TableFilter, canary)
ipt.DeleteChain(TableNAT, canary)
if err := waitForReloads(&reloads, 1); err != nil {
t.Errorf("got unexpected number of reloads after flush: %v", err)
}
if err := waitForChains(mfe, canary, tables); err != nil {
t.Errorf("failed to create iptables canaries: %v", err)
}
// If we delete two chains, it should not reload yet
ipt.DeleteChain(TableMangle, canary)
ipt.DeleteChain(TableFilter, canary)
if err := waitForNoReload(&reloads, 1); err != nil {
t.Errorf("got unexpected number of reloads after partial flush: %v", err)
}
// If we delete the last chain, it should reload now
ipt.DeleteChain(TableNAT, canary)
if err := waitForReloads(&reloads, 2); err != nil {
t.Errorf("got unexpected number of reloads after slow flush: %v", err)
}
if err := waitForChains(mfe, canary, tables); err != nil {
t.Errorf("failed to create iptables canaries: %v", err)
}
// If we close the stop channel, it should stop running
close(stopCh)
if err := waitForNoReload(&reloads, 2); err != nil {
t.Errorf("got unexpected number of reloads after partial flush: %v", err)
}
if !ensureNoChains(mfe) {
t.Errorf("canaries still exist after stopping monitor")
}
}
func waitForChains(mfe *monitorFakeExec, canary Chain, tables []Table) error {
return utilwait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) {
mfe.Lock()
defer mfe.Unlock()
for _, table := range tables {
if !mfe.tables[string(table)].Has(string(canary)) {
return false, nil
}
}
return true, nil
})
}
func ensureNoChains(mfe *monitorFakeExec) bool {
mfe.Lock()
defer mfe.Unlock()
return mfe.tables["mangle"].Len() == 0 &&
mfe.tables["filter"].Len() == 0 &&
mfe.tables["nat"].Len() == 0
}
func waitForReloads(reloads *uint32, expected uint32) error {
if atomic.LoadUint32(reloads) < expected {
utilwait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) {
return atomic.LoadUint32(reloads) >= expected, nil
})
}
got := atomic.LoadUint32(reloads)
if got != expected {
return fmt.Errorf("expected %d, got %d", expected, got)
}
return nil
}
func waitForNoReload(reloads *uint32, expected uint32) error {
utilwait.PollImmediate(50*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
return atomic.LoadUint32(reloads) > expected, nil
})
got := atomic.LoadUint32(reloads)
if got != expected {
return fmt.Errorf("expected %d, got %d", expected, got)
}
return nil
}

View File

@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"strings"
"time"
"k8s.io/kubernetes/pkg/util/iptables"
)
@ -99,6 +100,9 @@ func (f *FakeIPTables) RestoreAll(data []byte, flush iptables.FlushFlag, counter
return nil
}
func (f *FakeIPTables) Monitor(canary iptables.Chain, tables []iptables.Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) {
}
func getToken(line, separator string) string {
tokens := strings.Split(line, separator)
if len(tokens) == 2 {