From 1e50f118fdba2874b9ba62c38bfea6903ebee813 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Thu, 11 Sep 2014 16:08:25 -0700 Subject: [PATCH] Move UDP backend setup to a function --- pkg/proxy/proxier.go | 83 +++++++++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 35 deletions(-) diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 9549c4b7f8f..7d5f48cb5fa 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -40,6 +40,20 @@ type serviceInfo struct { active bool } +func (si *serviceInfo) isActive() bool { + si.mu.Lock() + defer si.mu.Unlock() + return si.active +} + +func (si *serviceInfo) setActive(val bool) bool { + si.mu.Lock() + defer si.mu.Unlock() + tmp := si.active + si.active = val + return tmp +} + // How long we wait for a connection to a backend. const endpointDialTimeout = 5 * time.Second @@ -67,12 +81,9 @@ func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) { return } for { - info.mu.Lock() - if !info.active { - info.mu.Unlock() + if !info.isActive() { break } - info.mu.Unlock() // Block until a connection is made. inConn, err := tcp.Accept() @@ -140,12 +151,9 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) { activeClients := newClientCache() var buffer [4096]byte // 4KiB should be enough for most whole-packets for { - info.mu.Lock() - if !info.active { - info.mu.Unlock() + if !info.isActive() { break } - info.mu.Unlock() // Block until data arrives. // TODO: Accumulate a histogram of n or something, to fine tune the buffer size. @@ -161,30 +169,10 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) { break } // If this is a client we know already, reuse the connection and goroutine. - activeClients.mu.Lock() - svrConn, found := activeClients.clients[cliAddr.String()] - if !found { - // TODO: This could spin up a new goroutine to make the outbound connection, - // and keep accepting inbound traffic. - glog.Infof("New UDP connection from %s", cliAddr) - endpoint, err := proxier.loadBalancer.NextEndpoint(service, cliAddr) - if err != nil { - glog.Errorf("Couldn't find an endpoint for %s %v", service, err) - activeClients.mu.Unlock() - continue - } - glog.Infof("Mapped service %s to endpoint %s", service, endpoint) - svrConn, err = net.DialTimeout("udp", endpoint, endpointDialTimeout) - if err != nil { - // TODO: Try another endpoint? - glog.Errorf("Dial failed: %v", err) - activeClients.mu.Unlock() - continue - } - activeClients.clients[cliAddr.String()] = svrConn - go udp.proxyClient(cliAddr, svrConn, activeClients, info.timeout) + svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, info.timeout) + if err != nil { + continue } - activeClients.mu.Unlock() // TODO: It would be nice to let the goroutine handle this write, but we don't // really want to copy the buffer. We could do a pool of buffers or something. _, err = svrConn.Write(buffer[0:n]) @@ -203,6 +191,33 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) { } } +func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service string, timeout time.Duration) (net.Conn, error) { + activeClients.mu.Lock() + defer activeClients.mu.Unlock() + + svrConn, found := activeClients.clients[cliAddr.String()] + if !found { + // TODO: This could spin up a new goroutine to make the outbound connection, + // and keep accepting inbound traffic. + glog.Infof("New UDP connection from %s", cliAddr) + endpoint, err := proxier.loadBalancer.NextEndpoint(service, cliAddr) + if err != nil { + glog.Errorf("Couldn't find an endpoint for %s %v", service, err) + return nil, err + } + glog.Infof("Mapped service %s to endpoint %s", service, endpoint) + svrConn, err = net.DialTimeout("udp", endpoint, endpointDialTimeout) + if err != nil { + // TODO: Try another endpoint? + glog.Errorf("Dial failed: %v", err) + return nil, err + } + activeClients.clients[cliAddr.String()] = svrConn + go udp.proxyClient(cliAddr, svrConn, activeClients, timeout) + } + return svrConn, nil +} + // This function is expected to be called as a goroutine. func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { defer svrConn.Close() @@ -305,9 +320,7 @@ func (proxier *Proxier) StopProxy(service string) error { } func (proxier *Proxier) stopProxyInternal(info *serviceInfo) error { - info.mu.Lock() - defer info.mu.Unlock() - if !info.active { + if !info.setActive(false) { return nil } glog.Infof("Removing service: %s", info.name) @@ -378,7 +391,7 @@ func (proxier *Proxier) OnUpdate(services []api.Service) { activeServices.Insert(service.ID) info, exists := proxier.getServiceInfo(service.ID) // TODO: check health of the socket? What if ProxyLoop exited? - if exists && info.active && info.port == service.Port { + if exists && info.isActive() && info.port == service.Port { continue } if exists && info.port != service.Port {