proxy: cleanup and minor refactoring

This change includes minor refactoring and cleanup of the proxy
package including the following items:

 * Rename source files with misspelling of round robin
 * Remove unnecessary and redundant comments
 * Update comments for clarity
 * Add locking when updating the round-robin index
 * Improve method receiver names
 * Rename the LoadBalance method to NextEndpoint to add clarity

No changes in behaviour have been introduced.
This commit is contained in:
Kelsey Hightower
2014-08-03 12:23:15 -07:00
parent 2282f9ce3a
commit 1d3e660248
6 changed files with 175 additions and 179 deletions

View File

@@ -30,40 +30,40 @@ import (
)
type serviceInfo struct {
name string
port int
active bool
listener net.Listener
lock sync.Mutex
mu sync.Mutex // protects active
active bool
}
// Proxier is a simple proxy for tcp connections between a localhost:lport and services that provide
// the actual implementations.
// Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations.
type Proxier struct {
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[string]*serviceInfo
// protects 'serviceMap'
serviceLock sync.Mutex
}
// NewProxier returns a newly created and correctly initialized instance of Proxier.
// NewProxier returns a new Proxier given a LoadBalancer.
func NewProxier(loadBalancer LoadBalancer) *Proxier {
return &Proxier{loadBalancer: loadBalancer, serviceMap: make(map[string]*serviceInfo)}
return &Proxier{
loadBalancer: loadBalancer,
serviceMap: make(map[string]*serviceInfo),
}
}
func copyBytes(in, out *net.TCPConn) {
glog.Infof("Copying from %v <-> %v <-> %v <-> %v",
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
_, err := io.Copy(in, out)
if err != nil {
if _, err := io.Copy(in, out); err != nil {
glog.Errorf("I/O error: %v", err)
}
in.CloseRead()
out.CloseWrite()
}
// proxyConnection creates a bidirectional byte shuffler.
// It copies bytes to/from each connection.
// proxyConnection proxies data bidirectionally between in and out.
func proxyConnection(in, out *net.TCPConn) {
glog.Infof("Creating proxy between %v <-> %v <-> %v <-> %v",
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
@@ -71,39 +71,43 @@ func proxyConnection(in, out *net.TCPConn) {
go copyBytes(out, in)
}
// StopProxy stops a proxy for the named service. It stops the proxy loop and closes the socket.
// StopProxy stops the proxy for the named service.
func (proxier *Proxier) StopProxy(service string) error {
// TODO: delete from map here?
info, found := proxier.getServiceInfo(service)
if !found {
return fmt.Errorf("unknown service: %s", service)
}
info.lock.Lock()
defer info.lock.Unlock()
return proxier.stopProxyInternal(info)
}
// Requires that info.lock be held before calling.
func (proxier *Proxier) stopProxyInternal(info *serviceInfo) error {
info.mu.Lock()
defer info.mu.Unlock()
if !info.active {
return nil
}
glog.Infof("Removing service: %s", info.name)
info.active = false
return info.listener.Close()
}
func (proxier *Proxier) getServiceInfo(service string) (*serviceInfo, bool) {
proxier.serviceLock.Lock()
defer proxier.serviceLock.Unlock()
proxier.mu.Lock()
defer proxier.mu.Unlock()
info, ok := proxier.serviceMap[service]
return info, ok
}
func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) {
proxier.serviceLock.Lock()
defer proxier.serviceLock.Unlock()
proxier.mu.Lock()
defer proxier.mu.Unlock()
info.name = service
proxier.serviceMap[service] = info
}
// AcceptHandler begins accepting incoming connections from listener and proxying the connections to the load-balanced endpoints.
// It never returns.
// AcceptHandler proxies incoming connections for the specified service
// to the load-balanced service endpoints.
func (proxier *Proxier) AcceptHandler(service string, listener net.Listener) {
info, found := proxier.getServiceInfo(service)
if !found {
@@ -111,31 +115,26 @@ func (proxier *Proxier) AcceptHandler(service string, listener net.Listener) {
return
}
for {
info.lock.Lock()
info.mu.Lock()
if !info.active {
info.lock.Unlock()
info.mu.Unlock()
break
}
info.lock.Unlock()
info.mu.Unlock()
inConn, err := listener.Accept()
if err != nil {
glog.Errorf("Accept failed: %v", err)
continue
}
glog.Infof("Accepted connection from: %v to %v", inConn.RemoteAddr(), inConn.LocalAddr())
// Figure out where this request should go.
endpoint, err := proxier.loadBalancer.LoadBalance(service, inConn.RemoteAddr())
endpoint, err := proxier.loadBalancer.NextEndpoint(service, inConn.RemoteAddr())
if err != nil {
glog.Errorf("Couldn't find an endpoint for %s %v", service, err)
inConn.Close()
continue
}
glog.Infof("Mapped service %s to endpoint %s", service, endpoint)
outConn, err := net.DialTimeout("tcp", endpoint, time.Duration(5)*time.Second)
// We basically need to take everything from inConn and send to outConn
// and anything coming from outConn needs to be sent to inConn.
if err != nil {
glog.Errorf("Dial failed: %v", err)
inConn.Close()
@@ -145,9 +144,10 @@ func (proxier *Proxier) AcceptHandler(service string, listener net.Listener) {
}
}
// addService starts listening for a new service on a given port.
// addService creates and registers a service proxy for the given service on
// the specified port.
// It returns the net.Listener of the service proxy.
func (proxier *Proxier) addService(service string, port int) (net.Listener, error) {
// Make sure we can start listening on the port before saying all's well.
l, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
return nil, err
@@ -156,7 +156,7 @@ func (proxier *Proxier) addService(service string, port int) (net.Listener, erro
return l, nil
}
// used to globally lock around unused ports. Only used in testing.
// used to globally lock around unused ports. Only used in testing.
var unusedPortLock sync.Mutex
// addService starts listening for a new service, returning the port it's using.
@@ -164,7 +164,6 @@ var unusedPortLock sync.Mutex
func (proxier *Proxier) addServiceOnUnusedPort(service string) (string, error) {
unusedPortLock.Lock()
defer unusedPortLock.Unlock()
// Make sure we can start listening on the port before saying all's well.
l, err := net.Listen("tcp", ":0")
if err != nil {
return "", err
@@ -188,24 +187,22 @@ func (proxier *Proxier) addServiceOnUnusedPort(service string) (string, error) {
func (proxier *Proxier) addServiceCommon(service string, l net.Listener) {
glog.Infof("Listening for %s on %s", service, l.Addr().String())
// If that succeeds, start the accepting loop.
go proxier.AcceptHandler(service, l)
}
// OnUpdate receives update notices for the updated services and start listening newly added services.
// It implements "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config".ServiceConfigHandler.OnUpdate.
func (proxier *Proxier) OnUpdate(services []api.Service) {
// OnUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set.
func (proxier Proxier) OnUpdate(services []api.Service) {
glog.Infof("Received update notice: %+v", services)
serviceNames := util.StringSet{}
activeServices := util.StringSet{}
for _, service := range services {
serviceNames.Insert(service.ID)
activeServices.Insert(service.ID)
info, exists := proxier.getServiceInfo(service.ID)
if exists && info.port == service.Port {
continue
}
if exists {
// Stop the old proxier.
proxier.StopProxy(service.ID)
}
glog.Infof("Adding a new service %s on port %d", service.ID, service.Port)
@@ -220,15 +217,11 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
listener: listener,
})
}
proxier.serviceLock.Lock()
defer proxier.serviceLock.Unlock()
proxier.mu.Lock()
defer proxier.mu.Unlock()
for name, info := range proxier.serviceMap {
info.lock.Lock()
if !serviceNames.Has(name) && info.active {
glog.Infof("Removing service: %s", name)
if !activeServices.Has(name) {
proxier.stopProxyInternal(info)
}
info.lock.Unlock()
}
}