Merge pull request #1850 from thockin/nsinteg
Fix etcd in proxy for namespace awareness
This commit is contained in:
		@@ -116,10 +116,50 @@ func (s ConfigSourceEtcd) Run() {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// decodeServices recurses from the root of the service storage directory into each namespace to get each service and endpoint object
 | 
			
		||||
func (s ConfigSourceEtcd) decodeServices(node *etcd.Node, retServices []api.Service, retEndpoints []api.Endpoints) ([]api.Service, []api.Endpoints, error) {
 | 
			
		||||
	// TODO this needs to go against API server desperately, so much redundant error prone code here
 | 
			
		||||
	// we hit a namespace boundary, recurse until we find actual nodes
 | 
			
		||||
	if node.Dir == true {
 | 
			
		||||
		for _, n := range node.Nodes {
 | 
			
		||||
			var err error // Don't shadow the ret* variables.
 | 
			
		||||
			retServices, retEndpoints, err = s.decodeServices(n, retServices, retEndpoints)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return retServices, retEndpoints, err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return retServices, retEndpoints, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// we have an actual service node
 | 
			
		||||
	var svc api.Service
 | 
			
		||||
	err := latest.Codec.DecodeInto([]byte(node.Value), &svc)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Failed to load Service: %s (%#v)", node.Value, err)
 | 
			
		||||
	} else {
 | 
			
		||||
		// so we got a service we can handle, and now get endpoints
 | 
			
		||||
		retServices = append(retServices, svc)
 | 
			
		||||
		// get the endpoints
 | 
			
		||||
		endpoints, err := s.GetEndpoints(svc.Namespace, svc.ID)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			if tools.IsEtcdNotFound(err) {
 | 
			
		||||
				glog.V(4).Infof("Unable to get endpoints for %s %s : %v", svc.Namespace, svc.ID, err)
 | 
			
		||||
			}
 | 
			
		||||
			glog.Errorf("Couldn't get endpoints for %s %s : %v skipping", svc.Namespace, svc.ID, err)
 | 
			
		||||
			endpoints = api.Endpoints{}
 | 
			
		||||
		} else {
 | 
			
		||||
			glog.V(3).Infof("Got service: %s %s on localport %d mapping to: %s", svc.Namespace, svc.ID, svc.Port, endpoints)
 | 
			
		||||
		}
 | 
			
		||||
		retEndpoints = append(retEndpoints, endpoints)
 | 
			
		||||
	}
 | 
			
		||||
	return retServices, retEndpoints, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetServices finds the list of services and their endpoints from etcd.
 | 
			
		||||
// This operation is akin to a set a known good at regular intervals.
 | 
			
		||||
func (s ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error) {
 | 
			
		||||
	response, err := s.client.Get(registryRoot+"/specs", true, false)
 | 
			
		||||
	// this is a recursive query now that services are namespaced under "/registry/services/specs/<ns>/<name>"
 | 
			
		||||
	response, err := s.client.Get(registryRoot+"/specs", false, true)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if tools.IsEtcdNotFound(err) {
 | 
			
		||||
			glog.V(4).Infof("Failed to get the key %s: %v", registryRoot, err)
 | 
			
		||||
@@ -128,40 +168,18 @@ func (s ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error)
 | 
			
		||||
		}
 | 
			
		||||
		return []api.Service{}, []api.Endpoints{}, err
 | 
			
		||||
	}
 | 
			
		||||
	// this code needs to go through the API server in the future, this is one big hack
 | 
			
		||||
	if response.Node.Dir == true {
 | 
			
		||||
		retServices := make([]api.Service, len(response.Node.Nodes))
 | 
			
		||||
		retEndpoints := make([]api.Endpoints, len(response.Node.Nodes))
 | 
			
		||||
		// Ok, so we have directories, this list should be the list
 | 
			
		||||
		// of services. Find the local port to listen on and remote endpoints
 | 
			
		||||
		// and create a Service entry for it.
 | 
			
		||||
		for i, node := range response.Node.Nodes {
 | 
			
		||||
			var svc api.Service
 | 
			
		||||
			err = latest.Codec.DecodeInto([]byte(node.Value), &svc)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				glog.Errorf("Failed to load Service: %s (%#v)", node.Value, err)
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			retServices[i] = svc
 | 
			
		||||
			endpoints, err := s.GetEndpoints(svc.ID)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				if tools.IsEtcdNotFound(err) {
 | 
			
		||||
					glog.V(4).Infof("Unable to get endpoints for %s : %v", svc.ID, err)
 | 
			
		||||
				}
 | 
			
		||||
				glog.Errorf("Couldn't get endpoints for %s : %v skipping", svc.ID, err)
 | 
			
		||||
				endpoints = api.Endpoints{}
 | 
			
		||||
			} else {
 | 
			
		||||
				glog.V(3).Infof("Got service: %s on localport %d mapping to: %s", svc.ID, svc.Port, endpoints)
 | 
			
		||||
			}
 | 
			
		||||
			retEndpoints[i] = endpoints
 | 
			
		||||
		}
 | 
			
		||||
		return retServices, retEndpoints, err
 | 
			
		||||
		retServices := []api.Service{}
 | 
			
		||||
		retEndpoints := []api.Endpoints{}
 | 
			
		||||
		return s.decodeServices(response.Node, retServices, retEndpoints)
 | 
			
		||||
	}
 | 
			
		||||
	return nil, nil, fmt.Errorf("did not get the root of the registry %s", registryRoot)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetEndpoints finds the list of endpoints of the service from etcd.
 | 
			
		||||
func (s ConfigSourceEtcd) GetEndpoints(service string) (api.Endpoints, error) {
 | 
			
		||||
	key := path.Join(registryRoot, "endpoints", service)
 | 
			
		||||
func (s ConfigSourceEtcd) GetEndpoints(namespace, service string) (api.Endpoints, error) {
 | 
			
		||||
	key := path.Join(registryRoot, "endpoints", namespace, service)
 | 
			
		||||
	response, err := s.client.Get(key, true, false)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Failed to get the key: %s %v", key, err)
 | 
			
		||||
@@ -195,9 +213,12 @@ func (s ConfigSourceEtcd) WatchForChanges() {
 | 
			
		||||
		if !ok {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		// only listen for non directory changes
 | 
			
		||||
		if watchResponse.Node.Dir == false {
 | 
			
		||||
			s.ProcessChange(watchResponse)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s ConfigSourceEtcd) ProcessChange(response *etcd.Response) {
 | 
			
		||||
	glog.V(4).Infof("Processing a change in service configuration... %s", *response)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user