diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 5f30935b120..9549c4b7f8f 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -35,6 +35,7 @@ type serviceInfo struct { port int protocol string socket proxySocket + timeout time.Duration mu sync.Mutex // protects active active bool } @@ -130,9 +131,6 @@ func newClientCache() *clientCache { return &clientCache{clients: map[string]net.Conn{}} } -// How long we leave idle UDP connections open. -const udpIdleTimeout = 1 * time.Minute - func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) { info, found := proxier.getServiceInfo(service) if !found { @@ -184,7 +182,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) { continue } activeClients.clients[cliAddr.String()] = svrConn - go udp.proxyClient(cliAddr, svrConn, activeClients) + go udp.proxyClient(cliAddr, svrConn, activeClients, info.timeout) } activeClients.mu.Unlock() // TODO: It would be nice to let the goroutine handle this write, but we don't @@ -197,7 +195,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) { } continue } - svrConn.SetDeadline(time.Now().Add(udpIdleTimeout)) + svrConn.SetDeadline(time.Now().Add(info.timeout)) if err != nil { glog.Errorf("SetDeadline failed: %v", err) continue @@ -206,7 +204,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) { } // This function is expected to be called as a goroutine. -func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache) { +func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { defer svrConn.Close() var buffer [4096]byte for { @@ -217,7 +215,7 @@ func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activ } break } - svrConn.SetDeadline(time.Now().Add(udpIdleTimeout)) + svrConn.SetDeadline(time.Now().Add(timeout)) if err != nil { glog.Errorf("SetDeadline failed: %v", err) break @@ -334,8 +332,9 @@ func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) { var unusedPortLock sync.Mutex // addServiceOnUnusedPort starts listening for a new service, returning the -// port it's using. For testing on a system with unknown ports used. -func (proxier *Proxier) addServiceOnUnusedPort(service, protocol string) (string, error) { +// port it's using. For testing on a system with unknown ports used. The timeout only applies to UDP +// connections, for now. +func (proxier *Proxier) addServiceOnUnusedPort(service, protocol string, timeout time.Duration) (string, error) { unusedPortLock.Lock() defer unusedPortLock.Unlock() sock, err := newProxySocket(protocol, proxier.address, 0) @@ -355,6 +354,7 @@ func (proxier *Proxier) addServiceOnUnusedPort(service, protocol string) (string protocol: protocol, active: true, socket: sock, + timeout: timeout, }) proxier.startAccepting(service, sock) return port, nil @@ -365,6 +365,9 @@ func (proxier *Proxier) startAccepting(service string, sock proxySocket) { go sock.ProxyLoop(service, proxier) } +// How long we leave idle UDP connections open. +const udpIdleTimeout = 1 * time.Minute + // OnUpdate manages the active set of service proxies. // Active service proxies are reinitialized if found in the update set or // shutdown if missing from the update set. @@ -395,6 +398,7 @@ func (proxier *Proxier) OnUpdate(services []api.Service) { protocol: service.Protocol, active: true, socket: sock, + timeout: udpIdleTimeout, }) proxier.startAccepting(service.ID, sock) } diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index 03c7550bb78..f62ca8c6f33 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -146,7 +146,7 @@ func TestTCPProxy(t *testing.T) { p := NewProxier(lb, "127.0.0.1") - proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP") + proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -164,7 +164,7 @@ func TestUDPProxy(t *testing.T) { p := NewProxier(lb, "127.0.0.1") - proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP") + proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -182,7 +182,7 @@ func TestTCPProxyStop(t *testing.T) { p := NewProxier(lb, "127.0.0.1") - proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP") + proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -210,7 +210,7 @@ func TestUDPProxyStop(t *testing.T) { p := NewProxier(lb, "127.0.0.1") - proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP") + proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -238,7 +238,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { p := NewProxier(lb, "127.0.0.1") - proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP") + proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -265,7 +265,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { p := NewProxier(lb, "127.0.0.1") - proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP") + proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -292,7 +292,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { p := NewProxier(lb, "127.0.0.1") - proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP") + proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -324,7 +324,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { p := NewProxier(lb, "127.0.0.1") - proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP") + proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -356,7 +356,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { p := NewProxier(lb, "127.0.0.1") - proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP") + proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -401,7 +401,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { p := NewProxier(lb, "127.0.0.1") - proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP") + proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -434,3 +434,5 @@ func TestUDPProxyUpdatePort(t *testing.T) { } pc.Close() } + +// TODO: Test UDP timeouts.