Do interface{} -> runtime.Object rename everywhere

This commit is contained in:
Daniel Smith 2014-09-05 19:22:03 -07:00
parent 1c2b65788d
commit 0d30a656ef
33 changed files with 190 additions and 198 deletions

View File

@ -31,7 +31,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
func validateObject(obj interface{}) (errors []error) { func validateObject(obj runtime.Object) (errors []error) {
switch t := obj.(type) { switch t := obj.(type) {
case *api.ReplicationController: case *api.ReplicationController:
errors = validation.ValidateManifest(&t.DesiredState.PodTemplate.DesiredState.Manifest) errors = validation.ValidateManifest(&t.DesiredState.PodTemplate.DesiredState.Manifest)
@ -85,7 +85,7 @@ func walkJSONFiles(inDir string, fn func(name, path string, data []byte)) error
} }
func TestApiExamples(t *testing.T) { func TestApiExamples(t *testing.T) {
expected := map[string]interface{}{ expected := map[string]runtime.Object{
"controller": &api.ReplicationController{}, "controller": &api.ReplicationController{},
"controller-list": &api.ReplicationControllerList{}, "controller-list": &api.ReplicationControllerList{},
"pod": &api.Pod{}, "pod": &api.Pod{},
@ -120,7 +120,7 @@ func TestApiExamples(t *testing.T) {
} }
func TestExamples(t *testing.T) { func TestExamples(t *testing.T) {
expected := map[string]interface{}{ expected := map[string]runtime.Object{
"frontend-controller": &api.ReplicationController{}, "frontend-controller": &api.ReplicationController{},
"redis-slave-controller": &api.ReplicationController{}, "redis-slave-controller": &api.ReplicationController{},
"redis-master": &api.Pod{}, "redis-master": &api.Pod{},

View File

@ -27,17 +27,11 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version" "github.com/GoogleCloudPlatform/kubernetes/pkg/version"
"github.com/golang/glog" "github.com/golang/glog"
) )
// Codec defines methods for serializing and deserializing API objects.
type Codec interface {
Encode(obj interface{}) (data []byte, err error)
Decode(data []byte) (interface{}, error)
DecodeInto(data []byte, obj interface{}) error
}
// mux is an object that can register http handlers. // mux is an object that can register http handlers.
type mux interface { type mux interface {
Handle(pattern string, handler http.Handler) Handle(pattern string, handler http.Handler)
@ -53,7 +47,7 @@ type defaultAPIServer struct {
// Handle returns a Handler function that expose the provided storage interfaces // Handle returns a Handler function that expose the provided storage interfaces
// as RESTful resources at prefix, serialized by codec, and also includes the support // as RESTful resources at prefix, serialized by codec, and also includes the support
// http resources. // http resources.
func Handle(storage map[string]RESTStorage, codec Codec, prefix string) http.Handler { func Handle(storage map[string]RESTStorage, codec runtime.Codec, prefix string) http.Handler {
group := NewAPIGroup(storage, codec) group := NewAPIGroup(storage, codec)
mux := http.NewServeMux() mux := http.NewServeMux()
@ -78,7 +72,7 @@ type APIGroup struct {
// This is a helper method for registering multiple sets of REST handlers under different // This is a helper method for registering multiple sets of REST handlers under different
// prefixes onto a server. // prefixes onto a server.
// TODO: add multitype codec serialization // TODO: add multitype codec serialization
func NewAPIGroup(storage map[string]RESTStorage, codec Codec) *APIGroup { func NewAPIGroup(storage map[string]RESTStorage, codec runtime.Codec) *APIGroup {
return &APIGroup{RESTHandler{ return &APIGroup{RESTHandler{
storage: storage, storage: storage,
codec: codec, codec: codec,
@ -147,7 +141,7 @@ func handleVersion(w http.ResponseWriter, req *http.Request) {
} }
// writeJSON renders an object as JSON to the response. // writeJSON renders an object as JSON to the response.
func writeJSON(statusCode int, codec Codec, object interface{}, w http.ResponseWriter) { func writeJSON(statusCode int, codec runtime.Codec, object runtime.Object, w http.ResponseWriter) {
output, err := codec.Encode(object) output, err := codec.Encode(object)
if err != nil { if err != nil {
errorJSON(err, codec, w) errorJSON(err, codec, w)
@ -159,7 +153,7 @@ func writeJSON(statusCode int, codec Codec, object interface{}, w http.ResponseW
} }
// errorJSON renders an error to the response. // errorJSON renders an error to the response.
func errorJSON(err error, codec Codec, w http.ResponseWriter) { func errorJSON(err error, codec runtime.Codec, w http.ResponseWriter) {
status := errToAPIStatus(err) status := errToAPIStatus(err)
writeJSON(status.Code, codec, status, w) writeJSON(status.Code, codec, status, w)
} }

View File

@ -17,18 +17,19 @@ limitations under the License.
package apiserver package apiserver
import ( import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
// WorkFunc is used to perform any time consuming work for an api call, after // WorkFunc is used to perform any time consuming work for an api call, after
// the input has been validated. Pass one of these to MakeAsync to create an // the input has been validated. Pass one of these to MakeAsync to create an
// appropriate return value for the Update, Delete, and Create methods. // appropriate return value for the Update, Delete, and Create methods.
type WorkFunc func() (result interface{}, err error) type WorkFunc func() (result runtime.Object, err error)
// MakeAsync takes a function and executes it, delivering the result in the way required // MakeAsync takes a function and executes it, delivering the result in the way required
// by RESTStorage's Update, Delete, and Create methods. // by RESTStorage's Update, Delete, and Create methods.
func MakeAsync(fn WorkFunc) <-chan interface{} { func MakeAsync(fn WorkFunc) <-chan runtime.Object {
channel := make(chan interface{}) channel := make(chan runtime.Object)
go func() { go func() {
defer util.HandleCrash() defer util.HandleCrash()
obj, err := fn() obj, err := fn()

View File

@ -18,6 +18,7 @@ package apiserver
import ( import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
@ -25,25 +26,25 @@ import (
// Resources which are exported to the RESTful API of apiserver need to implement this interface. // Resources which are exported to the RESTful API of apiserver need to implement this interface.
type RESTStorage interface { type RESTStorage interface {
// New returns an empty object that can be used with Create and Update after request data has been put into it. // New returns an empty object that can be used with Create and Update after request data has been put into it.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, interface{}) // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
New() interface{} New() runtime.Object
// List selects resources in the storage which match to the selector. // List selects resources in the storage which match to the selector.
// TODO: add field selector in addition to label selector. // TODO: add field selector in addition to label selector.
List(labels.Selector) (interface{}, error) List(labels.Selector) (runtime.Object, error)
// Get finds a resource in the storage by id and returns it. // Get finds a resource in the storage by id and returns it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the // Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found. // returned error value err when the specified resource is not found.
Get(id string) (interface{}, error) Get(id string) (runtime.Object, error)
// Delete finds a resource in the storage and deletes it. // Delete finds a resource in the storage and deletes it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the // Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found. // returned error value err when the specified resource is not found.
Delete(id string) (<-chan interface{}, error) Delete(id string) (<-chan runtime.Object, error)
Create(interface{}) (<-chan interface{}, error) Create(runtime.Object) (<-chan runtime.Object, error)
Update(interface{}) (<-chan interface{}, error) Update(runtime.Object) (<-chan runtime.Object, error)
} }
// ResourceWatcher should be implemented by all RESTStorage objects that // ResourceWatcher should be implemented by all RESTStorage objects that

View File

@ -25,12 +25,13 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
type OperationHandler struct { type OperationHandler struct {
ops *Operations ops *Operations
codec Codec codec runtime.Codec
} }
func (h *OperationHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (h *OperationHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@ -63,8 +64,8 @@ func (h *OperationHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Operation represents an ongoing action which the server is performing. // Operation represents an ongoing action which the server is performing.
type Operation struct { type Operation struct {
ID string ID string
result interface{} result runtime.Object
awaiting <-chan interface{} awaiting <-chan runtime.Object
finished *time.Time finished *time.Time
lock sync.Mutex lock sync.Mutex
notify chan struct{} notify chan struct{}
@ -90,7 +91,7 @@ func NewOperations() *Operations {
} }
// NewOperation adds a new operation. It is lock-free. // NewOperation adds a new operation. It is lock-free.
func (ops *Operations) NewOperation(from <-chan interface{}) *Operation { func (ops *Operations) NewOperation(from <-chan runtime.Object) *Operation {
id := atomic.AddInt64(&ops.lastID, 1) id := atomic.AddInt64(&ops.lastID, 1)
op := &Operation{ op := &Operation{
ID: strconv.FormatInt(id, 10), ID: strconv.FormatInt(id, 10),
@ -110,7 +111,7 @@ func (ops *Operations) insert(op *Operation) {
} }
// List lists operations for an API client. // List lists operations for an API client.
func (ops *Operations) List() api.ServerOpList { func (ops *Operations) List() *api.ServerOpList {
ops.lock.Lock() ops.lock.Lock()
defer ops.lock.Unlock() defer ops.lock.Unlock()
@ -119,7 +120,7 @@ func (ops *Operations) List() api.ServerOpList {
ids = append(ids, id) ids = append(ids, id)
} }
sort.StringSlice(ids).Sort() sort.StringSlice(ids).Sort()
ol := api.ServerOpList{} ol := &api.ServerOpList{}
for _, id := range ids { for _, id := range ids {
ol.Items = append(ol.Items, api.ServerOp{JSONBase: api.JSONBase{ID: id}}) ol.Items = append(ol.Items, api.ServerOp{JSONBase: api.JSONBase{ID: id}})
} }
@ -185,7 +186,7 @@ func (op *Operation) expired(limitTime time.Time) bool {
// StatusOrResult returns status information or the result of the operation if it is complete, // StatusOrResult returns status information or the result of the operation if it is complete,
// with a bool indicating true in the latter case. // with a bool indicating true in the latter case.
func (op *Operation) StatusOrResult() (description interface{}, finished bool) { func (op *Operation) StatusOrResult() (description runtime.Object, finished bool) {
op.lock.Lock() op.lock.Lock()
defer op.lock.Unlock() defer op.lock.Unlock()

View File

@ -20,11 +20,12 @@ import (
"net/http" "net/http"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
) )
type RedirectHandler struct { type RedirectHandler struct {
storage map[string]RESTStorage storage map[string]RESTStorage
codec Codec codec runtime.Codec
} }
func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {

View File

@ -23,11 +23,12 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
) )
type RESTHandler struct { type RESTHandler struct {
storage map[string]RESTStorage storage map[string]RESTStorage
codec Codec codec runtime.Codec
ops *Operations ops *Operations
asyncOpWait time.Duration asyncOpWait time.Duration
} }
@ -158,7 +159,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
} }
// createOperation creates an operation to process a channel response. // createOperation creates an operation to process a channel response.
func (h *RESTHandler) createOperation(out <-chan interface{}, sync bool, timeout time.Duration) *Operation { func (h *RESTHandler) createOperation(out <-chan runtime.Object, sync bool, timeout time.Duration) *Operation {
op := h.ops.NewOperation(out) op := h.ops.NewOperation(out)
if sync { if sync {
op.WaitFor(timeout) op.WaitFor(timeout)
@ -175,11 +176,6 @@ func (h *RESTHandler) finishReq(op *Operation, w http.ResponseWriter) {
if complete { if complete {
status := http.StatusOK status := http.StatusOK
switch stat := obj.(type) { switch stat := obj.(type) {
case api.Status:
httplog.LogOf(w).Addf("programmer error: use *api.Status as a result, not api.Status.")
if stat.Code != 0 {
status = stat.Code
}
case *api.Status: case *api.Status:
if stat.Code != 0 { if stat.Code != 0 {
status = stat.Code status = stat.Code

View File

@ -32,7 +32,7 @@ import (
type WatchHandler struct { type WatchHandler struct {
storage map[string]RESTStorage storage map[string]RESTStorage
codec Codec codec runtime.Codec
} }
func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion uint64) { func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion uint64) {

View File

@ -187,7 +187,8 @@ func (r *Request) Timeout(d time.Duration) *Request {
// If obj is a string, try to read a file of that name. // If obj is a string, try to read a file of that name.
// If obj is a []byte, send it directly. // If obj is a []byte, send it directly.
// If obj is an io.Reader, use it directly. // If obj is an io.Reader, use it directly.
// Otherwise, assume obj is an api type and marshall it correctly. // If obj is a runtime.Object, marshal it correctly.
// Otherwise, set an error.
func (r *Request) Body(obj interface{}) *Request { func (r *Request) Body(obj interface{}) *Request {
if r.err != nil { if r.err != nil {
return r return r
@ -204,13 +205,15 @@ func (r *Request) Body(obj interface{}) *Request {
r.body = bytes.NewBuffer(t) r.body = bytes.NewBuffer(t)
case io.Reader: case io.Reader:
r.body = t r.body = t
default: case runtime.Object:
data, err := runtime.DefaultCodec.Encode(obj) data, err := runtime.DefaultCodec.Encode(t)
if err != nil { if err != nil {
r.err = err r.err = err
return r return r
} }
r.body = bytes.NewBuffer(data) r.body = bytes.NewBuffer(data)
default:
r.err = fmt.Errorf("Unknown type used for body: %#v", obj)
} }
return r return r
} }
@ -314,7 +317,7 @@ func (r Result) Raw() ([]byte, error) {
} }
// Get returns the result as an object. // Get returns the result as an object.
func (r Result) Get() (interface{}, error) { func (r Result) Get() (runtime.Object, error) {
if r.err != nil { if r.err != nil {
return nil, r.err return nil, r.err
} }
@ -322,7 +325,7 @@ func (r Result) Get() (interface{}, error) {
} }
// Into stores the result into obj, if possible. // Into stores the result into obj, if possible.
func (r Result) Into(obj interface{}) error { func (r Result) Into(obj runtime.Object) error {
if r.err != nil { if r.err != nil {
return r.err return r.err
} }

View File

@ -43,7 +43,7 @@ func (p *Parser) ToWireFormat(data []byte, storage string) ([]byte, error) {
return nil, fmt.Errorf("unknown storage type: %v", storage) return nil, fmt.Errorf("unknown storage type: %v", storage)
} }
obj := reflect.New(prototypeType).Interface() obj := reflect.New(prototypeType).Interface().(runtime.Object)
err := runtime.DefaultCodec.DecodeInto(data, obj) err := runtime.DefaultCodec.DecodeInto(data, obj)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -53,7 +53,7 @@ func (s *ProxyServer) Serve() error {
func (s *ProxyServer) doError(w http.ResponseWriter, err error) { func (s *ProxyServer) doError(w http.ResponseWriter, err error) {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
w.Header().Add("Content-type", "application/json") w.Header().Add("Content-type", "application/json")
data, _ := runtime.DefaultCodec.Encode(api.Status{ data, _ := runtime.DefaultCodec.Encode(&api.Status{
Status: api.StatusFailure, Status: api.StatusFailure,
Message: fmt.Sprintf("internal error: %#v", err), Message: fmt.Sprintf("internal error: %#v", err),
}) })

View File

@ -36,7 +36,7 @@ import (
type ResourcePrinter interface { type ResourcePrinter interface {
// Print receives an arbitrary JSON body, formats it and prints it to a writer. // Print receives an arbitrary JSON body, formats it and prints it to a writer.
Print([]byte, io.Writer) error Print([]byte, io.Writer) error
PrintObj(interface{}, io.Writer) error PrintObj(runtime.Object, io.Writer) error
} }
// IdentityPrinter is an implementation of ResourcePrinter which simply copies the body out to the output stream. // IdentityPrinter is an implementation of ResourcePrinter which simply copies the body out to the output stream.
@ -49,7 +49,7 @@ func (i *IdentityPrinter) Print(data []byte, w io.Writer) error {
} }
// PrintObj is an implementation of ResourcePrinter.PrintObj which simply writes the object to the Writer. // PrintObj is an implementation of ResourcePrinter.PrintObj which simply writes the object to the Writer.
func (i *IdentityPrinter) PrintObj(obj interface{}, output io.Writer) error { func (i *IdentityPrinter) PrintObj(obj runtime.Object, output io.Writer) error {
data, err := runtime.DefaultCodec.Encode(obj) data, err := runtime.DefaultCodec.Encode(obj)
if err != nil { if err != nil {
return err return err
@ -62,7 +62,7 @@ type YAMLPrinter struct{}
// Print parses the data as JSON, re-formats as YAML and prints the YAML. // Print parses the data as JSON, re-formats as YAML and prints the YAML.
func (y *YAMLPrinter) Print(data []byte, w io.Writer) error { func (y *YAMLPrinter) Print(data []byte, w io.Writer) error {
var obj interface{} var obj runtime.Object
if err := json.Unmarshal(data, &obj); err != nil { if err := json.Unmarshal(data, &obj); err != nil {
return err return err
} }
@ -75,7 +75,7 @@ func (y *YAMLPrinter) Print(data []byte, w io.Writer) error {
} }
// PrintObj prints the data as YAML. // PrintObj prints the data as YAML.
func (y *YAMLPrinter) PrintObj(obj interface{}, w io.Writer) error { func (y *YAMLPrinter) PrintObj(obj runtime.Object, w io.Writer) error {
output, err := yaml.Marshal(obj) output, err := yaml.Marshal(obj)
if err != nil { if err != nil {
return err return err
@ -251,7 +251,7 @@ func printStatus(status *api.Status, w io.Writer) error {
// Print parses the data as JSON, then prints the parsed data in a human-friendly // Print parses the data as JSON, then prints the parsed data in a human-friendly
// format according to the type of the data. // format according to the type of the data.
func (h *HumanReadablePrinter) Print(data []byte, output io.Writer) error { func (h *HumanReadablePrinter) Print(data []byte, output io.Writer) error {
var mapObj map[string]interface{} var mapObj map[string]runtime.Object
if err := json.Unmarshal([]byte(data), &mapObj); err != nil { if err := json.Unmarshal([]byte(data), &mapObj); err != nil {
return err return err
} }
@ -268,7 +268,7 @@ func (h *HumanReadablePrinter) Print(data []byte, output io.Writer) error {
} }
// PrintObj prints the obj in a human-friendly format according to the type of the obj. // PrintObj prints the obj in a human-friendly format according to the type of the obj.
func (h *HumanReadablePrinter) PrintObj(obj interface{}, output io.Writer) error { func (h *HumanReadablePrinter) PrintObj(obj runtime.Object, output io.Writer) error {
w := tabwriter.NewWriter(output, 20, 5, 3, ' ', 0) w := tabwriter.NewWriter(output, 20, 5, 3, ' ', 0)
defer w.Flush() defer w.Flush()
if handler := h.handlerMap[reflect.TypeOf(obj)]; handler != nil { if handler := h.handlerMap[reflect.TypeOf(obj)]; handler != nil {
@ -300,6 +300,6 @@ func (t *TemplatePrinter) Print(data []byte, w io.Writer) error {
} }
// PrintObj formats the obj with the Go Template. // PrintObj formats the obj with the Go Template.
func (t *TemplatePrinter) PrintObj(obj interface{}, w io.Writer) error { func (t *TemplatePrinter) PrintObj(obj runtime.Object, w io.Writer) error {
return t.Template.Execute(w, obj) return t.Template.Execute(w, obj)
} }

View File

@ -131,7 +131,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf
} }
// API_v1beta1 returns the resources and codec for API version v1beta1. // API_v1beta1 returns the resources and codec for API version v1beta1.
func (m *Master) API_v1beta1() (map[string]apiserver.RESTStorage, apiserver.Codec) { func (m *Master) API_v1beta1() (map[string]apiserver.RESTStorage, runtime.Codec) {
storage := make(map[string]apiserver.RESTStorage) storage := make(map[string]apiserver.RESTStorage)
for k, v := range m.storage { for k, v := range m.storage {
storage[k] = v storage[k] = v

View File

@ -23,6 +23,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
) )
// BindingStorage implements the RESTStorage interface. When bindings are written, it // BindingStorage implements the RESTStorage interface. When bindings are written, it
@ -40,32 +41,32 @@ func NewBindingStorage(bindingRegistry Registry) *BindingStorage {
} }
// List returns an error because bindings are write-only objects. // List returns an error because bindings are write-only objects.
func (*BindingStorage) List(selector labels.Selector) (interface{}, error) { func (*BindingStorage) List(selector labels.Selector) (runtime.Object, error) {
return nil, errors.NewNotFound("binding", "list") return nil, errors.NewNotFound("binding", "list")
} }
// Get returns an error because bindings are write-only objects. // Get returns an error because bindings are write-only objects.
func (*BindingStorage) Get(id string) (interface{}, error) { func (*BindingStorage) Get(id string) (runtime.Object, error) {
return nil, errors.NewNotFound("binding", id) return nil, errors.NewNotFound("binding", id)
} }
// Delete returns an error because bindings are write-only objects. // Delete returns an error because bindings are write-only objects.
func (*BindingStorage) Delete(id string) (<-chan interface{}, error) { func (*BindingStorage) Delete(id string) (<-chan runtime.Object, error) {
return nil, errors.NewNotFound("binding", id) return nil, errors.NewNotFound("binding", id)
} }
// New returns a new binding object fit for having data unmarshalled into it. // New returns a new binding object fit for having data unmarshalled into it.
func (*BindingStorage) New() interface{} { func (*BindingStorage) New() runtime.Object {
return &api.Binding{} return &api.Binding{}
} }
// Create attempts to make the assignment indicated by the binding it recieves. // Create attempts to make the assignment indicated by the binding it recieves.
func (b *BindingStorage) Create(obj interface{}) (<-chan interface{}, error) { func (b *BindingStorage) Create(obj runtime.Object) (<-chan runtime.Object, error) {
binding, ok := obj.(*api.Binding) binding, ok := obj.(*api.Binding)
if !ok { if !ok {
return nil, fmt.Errorf("incorrect type: %#v", obj) return nil, fmt.Errorf("incorrect type: %#v", obj)
} }
return apiserver.MakeAsync(func() (interface{}, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
if err := b.registry.ApplyBinding(binding); err != nil { if err := b.registry.ApplyBinding(binding); err != nil {
return nil, err return nil, err
} }
@ -74,6 +75,6 @@ func (b *BindingStorage) Create(obj interface{}) (<-chan interface{}, error) {
} }
// Update returns an error-- this object may not be updated. // Update returns an error-- this object may not be updated.
func (b *BindingStorage) Update(obj interface{}) (<-chan interface{}, error) { func (b *BindingStorage) Update(obj runtime.Object) (<-chan runtime.Object, error) {
return nil, fmt.Errorf("Bindings may not be changed.") return nil, fmt.Errorf("Bindings may not be changed.")
} }

View File

@ -26,7 +26,7 @@ type Registry interface {
ListControllers() (*api.ReplicationControllerList, error) ListControllers() (*api.ReplicationControllerList, error)
WatchControllers(resourceVersion uint64) (watch.Interface, error) WatchControllers(resourceVersion uint64) (watch.Interface, error)
GetController(controllerID string) (*api.ReplicationController, error) GetController(controllerID string) (*api.ReplicationController, error)
CreateController(controller api.ReplicationController) error CreateController(controller *api.ReplicationController) error
UpdateController(controller api.ReplicationController) error UpdateController(controller *api.ReplicationController) error
DeleteController(controllerID string) error DeleteController(controllerID string) error
} }

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@ -51,7 +52,7 @@ func NewRegistryStorage(registry Registry, podRegistry pod.Registry) apiserver.R
} }
// Create registers the given ReplicationController. // Create registers the given ReplicationController.
func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { func (rs *RegistryStorage) Create(obj runtime.Object) (<-chan runtime.Object, error) {
controller, ok := obj.(*api.ReplicationController) controller, ok := obj.(*api.ReplicationController)
if !ok { if !ok {
return nil, fmt.Errorf("not a replication controller: %#v", obj) return nil, fmt.Errorf("not a replication controller: %#v", obj)
@ -67,8 +68,8 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
controller.CreationTimestamp = util.Now() controller.CreationTimestamp = util.Now()
return apiserver.MakeAsync(func() (interface{}, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
err := rs.registry.CreateController(*controller) err := rs.registry.CreateController(controller)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -77,14 +78,14 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
} }
// Delete asynchronously deletes the ReplicationController specified by its id. // Delete asynchronously deletes the ReplicationController specified by its id.
func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { func (rs *RegistryStorage) Delete(id string) (<-chan runtime.Object, error) {
return apiserver.MakeAsync(func() (interface{}, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(id) return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(id)
}), nil }), nil
} }
// Get obtains the ReplicationController specified by its id. // Get obtains the ReplicationController specified by its id.
func (rs *RegistryStorage) Get(id string) (interface{}, error) { func (rs *RegistryStorage) Get(id string) (runtime.Object, error) {
controller, err := rs.registry.GetController(id) controller, err := rs.registry.GetController(id)
if err != nil { if err != nil {
return nil, err return nil, err
@ -93,7 +94,7 @@ func (rs *RegistryStorage) Get(id string) (interface{}, error) {
} }
// List obtains a list of ReplicationControllers that match selector. // List obtains a list of ReplicationControllers that match selector.
func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { func (rs *RegistryStorage) List(selector labels.Selector) (runtime.Object, error) {
controllers, err := rs.registry.ListControllers() controllers, err := rs.registry.ListControllers()
if err != nil { if err != nil {
return nil, err return nil, err
@ -109,13 +110,13 @@ func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) {
} }
// New creates a new ReplicationController for use with Create and Update. // New creates a new ReplicationController for use with Create and Update.
func (rs RegistryStorage) New() interface{} { func (rs RegistryStorage) New() runtime.Object {
return &api.ReplicationController{} return &api.ReplicationController{}
} }
// Update replaces a given ReplicationController instance with an existing // Update replaces a given ReplicationController instance with an existing
// instance in storage.registry. // instance in storage.registry.
func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { func (rs *RegistryStorage) Update(obj runtime.Object) (<-chan runtime.Object, error) {
controller, ok := obj.(*api.ReplicationController) controller, ok := obj.(*api.ReplicationController)
if !ok { if !ok {
return nil, fmt.Errorf("not a replication controller: %#v", obj) return nil, fmt.Errorf("not a replication controller: %#v", obj)
@ -123,8 +124,8 @@ func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
if errs := validation.ValidateReplicationController(controller); len(errs) > 0 { if errs := validation.ValidateReplicationController(controller); len(errs) > 0 {
return nil, errors.NewInvalid("replicationController", controller.ID, errs) return nil, errors.NewInvalid("replicationController", controller.ID, errs)
} }
return apiserver.MakeAsync(func() (interface{}, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
err := rs.registry.UpdateController(*controller) err := rs.registry.UpdateController(controller)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -148,7 +149,7 @@ func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion u
}), nil }), nil
} }
func (rs *RegistryStorage) waitForController(ctrl api.ReplicationController) (interface{}, error) { func (rs *RegistryStorage) waitForController(ctrl *api.ReplicationController) (runtime.Object, error) {
for { for {
pods, err := rs.podRegistry.ListPods(labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector()) pods, err := rs.podRegistry.ListPods(labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector())
if err != nil { if err != nil {

View File

@ -27,5 +27,5 @@ type Registry interface {
ListEndpoints() (*api.EndpointsList, error) ListEndpoints() (*api.EndpointsList, error)
GetEndpoints(name string) (*api.Endpoints, error) GetEndpoints(name string) (*api.Endpoints, error)
WatchEndpoints(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) WatchEndpoints(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error)
UpdateEndpoints(e api.Endpoints) error UpdateEndpoints(e *api.Endpoints) error
} }

View File

@ -22,6 +22,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
@ -38,12 +39,12 @@ func NewStorage(registry Registry) apiserver.RESTStorage {
} }
// Get satisfies the RESTStorage interface. // Get satisfies the RESTStorage interface.
func (rs *Storage) Get(id string) (interface{}, error) { func (rs *Storage) Get(id string) (runtime.Object, error) {
return rs.registry.GetEndpoints(id) return rs.registry.GetEndpoints(id)
} }
// List satisfies the RESTStorage interface. // List satisfies the RESTStorage interface.
func (rs *Storage) List(selector labels.Selector) (interface{}, error) { func (rs *Storage) List(selector labels.Selector) (runtime.Object, error) {
if !selector.Empty() { if !selector.Empty() {
return nil, errors.New("label selectors are not supported on endpoints") return nil, errors.New("label selectors are not supported on endpoints")
} }
@ -57,21 +58,21 @@ func (rs *Storage) Watch(label, field labels.Selector, resourceVersion uint64) (
} }
// Create satisfies the RESTStorage interface but is unimplemented. // Create satisfies the RESTStorage interface but is unimplemented.
func (rs *Storage) Create(obj interface{}) (<-chan interface{}, error) { func (rs *Storage) Create(obj runtime.Object) (<-chan runtime.Object, error) {
return nil, errors.New("unimplemented") return nil, errors.New("unimplemented")
} }
// Update satisfies the RESTStorage interface but is unimplemented. // Update satisfies the RESTStorage interface but is unimplemented.
func (rs *Storage) Update(obj interface{}) (<-chan interface{}, error) { func (rs *Storage) Update(obj runtime.Object) (<-chan runtime.Object, error) {
return nil, errors.New("unimplemented") return nil, errors.New("unimplemented")
} }
// Delete satisfies the RESTStorage interface but is unimplemented. // Delete satisfies the RESTStorage interface but is unimplemented.
func (rs *Storage) Delete(id string) (<-chan interface{}, error) { func (rs *Storage) Delete(id string) (<-chan runtime.Object, error) {
return nil, errors.New("unimplemented") return nil, errors.New("unimplemented")
} }
// New implements the RESTStorage interface. // New implements the RESTStorage interface.
func (rs Storage) New() interface{} { func (rs Storage) New() runtime.Object {
return &api.Endpoints{} return &api.Endpoints{}
} }

View File

@ -82,7 +82,7 @@ func (r *Registry) ListPods(selector labels.Selector) (*api.PodList, error) {
// WatchPods begins watching for new, changed, or deleted pods. // WatchPods begins watching for new, changed, or deleted pods.
func (r *Registry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { func (r *Registry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) {
return r.WatchList("/registry/pods", resourceVersion, func(obj interface{}) bool { return r.WatchList("/registry/pods", resourceVersion, func(obj runtime.Object) bool {
pod, ok := obj.(*api.Pod) pod, ok := obj.(*api.Pod)
if !ok { if !ok {
glog.Errorf("Unexpected object during pod watch: %#v", obj) glog.Errorf("Unexpected object during pod watch: %#v", obj)
@ -110,14 +110,14 @@ func makeContainerKey(machine string) string {
} }
// CreatePod creates a pod based on a specification. // CreatePod creates a pod based on a specification.
func (r *Registry) CreatePod(pod api.Pod) error { func (r *Registry) CreatePod(pod *api.Pod) error {
// Set current status to "Waiting". // Set current status to "Waiting".
pod.CurrentState.Status = api.PodWaiting pod.CurrentState.Status = api.PodWaiting
pod.CurrentState.Host = "" pod.CurrentState.Host = ""
// DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling. // DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling.
pod.DesiredState.Status = api.PodRunning pod.DesiredState.Status = api.PodRunning
pod.DesiredState.Host = "" pod.DesiredState.Host = ""
return r.CreateObj(makePodKey(pod.ID), &pod) return r.CreateObj(makePodKey(pod.ID), pod)
} }
// ApplyBinding implements binding's registry // ApplyBinding implements binding's registry
@ -129,7 +129,7 @@ func (r *Registry) ApplyBinding(binding *api.Binding) error {
// Returns the current state of the pod, or an error. // Returns the current state of the pod, or an error.
func (r *Registry) setPodHostTo(podID, oldMachine, machine string) (finalPod *api.Pod, err error) { func (r *Registry) setPodHostTo(podID, oldMachine, machine string) (finalPod *api.Pod, err error) {
podKey := makePodKey(podID) podKey := makePodKey(podID)
err = r.AtomicUpdate(podKey, &api.Pod{}, func(obj interface{}) (interface{}, error) { err = r.AtomicUpdate(podKey, &api.Pod{}, func(obj runtime.Object) (runtime.Object, error) {
pod, ok := obj.(*api.Pod) pod, ok := obj.(*api.Pod)
if !ok { if !ok {
return nil, fmt.Errorf("unexpected object: %#v", obj) return nil, fmt.Errorf("unexpected object: %#v", obj)
@ -156,13 +156,13 @@ func (r *Registry) assignPod(podID string, machine string) error {
return err return err
} }
contKey := makeContainerKey(machine) contKey := makeContainerKey(machine)
err = r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { err = r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in runtime.Object) (runtime.Object, error) {
manifests := *in.(*api.ContainerManifestList) manifests := *in.(*api.ContainerManifestList)
manifests.Items = append(manifests.Items, manifest) manifests.Items = append(manifests.Items, manifest)
if !constraint.Allowed(manifests.Items) { if !constraint.Allowed(manifests.Items) {
return nil, fmt.Errorf("The assignment would cause a constraint violation") return nil, fmt.Errorf("The assignment would cause a constraint violation")
} }
return manifests, nil return &manifests, nil
}) })
if err != nil { if err != nil {
// Put the pod's host back the way it was. This is a terrible hack that // Put the pod's host back the way it was. This is a terrible hack that
@ -174,7 +174,7 @@ func (r *Registry) assignPod(podID string, machine string) error {
return err return err
} }
func (r *Registry) UpdatePod(pod api.Pod) error { func (r *Registry) UpdatePod(pod *api.Pod) error {
return fmt.Errorf("unimplemented!") return fmt.Errorf("unimplemented!")
} }
@ -205,7 +205,7 @@ func (r *Registry) DeletePod(podID string) error {
} }
// Next, remove the pod from the machine atomically. // Next, remove the pod from the machine atomically.
contKey := makeContainerKey(machine) contKey := makeContainerKey(machine)
return r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { return r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in runtime.Object) (runtime.Object, error) {
manifests := in.(*api.ContainerManifestList) manifests := in.(*api.ContainerManifestList)
newManifests := make([]api.ContainerManifest, 0, len(manifests.Items)) newManifests := make([]api.ContainerManifest, 0, len(manifests.Items))
found := false found := false
@ -258,7 +258,7 @@ func (r *Registry) GetController(controllerID string) (*api.ReplicationControlle
} }
// CreateController creates a new ReplicationController. // CreateController creates a new ReplicationController.
func (r *Registry) CreateController(controller api.ReplicationController) error { func (r *Registry) CreateController(controller *api.ReplicationController) error {
err := r.CreateObj(makeControllerKey(controller.ID), controller) err := r.CreateObj(makeControllerKey(controller.ID), controller)
if tools.IsEtcdNodeExist(err) { if tools.IsEtcdNodeExist(err) {
return errors.NewAlreadyExists("replicationController", controller.ID) return errors.NewAlreadyExists("replicationController", controller.ID)
@ -267,8 +267,8 @@ func (r *Registry) CreateController(controller api.ReplicationController) error
} }
// UpdateController replaces an existing ReplicationController. // UpdateController replaces an existing ReplicationController.
func (r *Registry) UpdateController(controller api.ReplicationController) error { func (r *Registry) UpdateController(controller *api.ReplicationController) error {
return r.SetObj(makeControllerKey(controller.ID), &controller) return r.SetObj(makeControllerKey(controller.ID), controller)
} }
// DeleteController deletes a ReplicationController specified by its ID. // DeleteController deletes a ReplicationController specified by its ID.
@ -293,7 +293,7 @@ func (r *Registry) ListServices() (*api.ServiceList, error) {
} }
// CreateService creates a new Service. // CreateService creates a new Service.
func (r *Registry) CreateService(svc api.Service) error { func (r *Registry) CreateService(svc *api.Service) error {
err := r.CreateObj(makeServiceKey(svc.ID), svc) err := r.CreateObj(makeServiceKey(svc.ID), svc)
if tools.IsEtcdNodeExist(err) { if tools.IsEtcdNodeExist(err) {
return errors.NewAlreadyExists("service", svc.ID) return errors.NewAlreadyExists("service", svc.ID)
@ -352,8 +352,8 @@ func (r *Registry) DeleteService(name string) error {
} }
// UpdateService replaces an existing Service. // UpdateService replaces an existing Service.
func (r *Registry) UpdateService(svc api.Service) error { func (r *Registry) UpdateService(svc *api.Service) error {
return r.SetObj(makeServiceKey(svc.ID), &svc) return r.SetObj(makeServiceKey(svc.ID), svc)
} }
// WatchServices begins watching for new, changed, or deleted service configurations. // WatchServices begins watching for new, changed, or deleted service configurations.
@ -378,10 +378,10 @@ func (r *Registry) ListEndpoints() (*api.EndpointsList, error) {
} }
// UpdateEndpoints update Endpoints of a Service. // UpdateEndpoints update Endpoints of a Service.
func (r *Registry) UpdateEndpoints(e api.Endpoints) error { func (r *Registry) UpdateEndpoints(e *api.Endpoints) error {
// TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop. // TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop.
return r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{}, return r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{},
func(input interface{}) (interface{}, error) { func(input runtime.Object) (runtime.Object, error) {
// TODO: racy - label query is returning different results for two simultaneous updaters // TODO: racy - label query is returning different results for two simultaneous updaters
return e, nil return e, nil
}) })

View File

@ -22,6 +22,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
@ -37,7 +38,7 @@ func NewRegistryStorage(m Registry) apiserver.RESTStorage {
} }
} }
func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { func (rs *RegistryStorage) Create(obj runtime.Object) (<-chan runtime.Object, error) {
minion, ok := obj.(*api.Minion) minion, ok := obj.(*api.Minion)
if !ok { if !ok {
return nil, fmt.Errorf("not a minion: %#v", obj) return nil, fmt.Errorf("not a minion: %#v", obj)
@ -48,7 +49,7 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
minion.CreationTimestamp = util.Now() minion.CreationTimestamp = util.Now()
return apiserver.MakeAsync(func() (interface{}, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
err := rs.registry.Insert(minion.ID) err := rs.registry.Insert(minion.ID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -64,7 +65,7 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
}), nil }), nil
} }
func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { func (rs *RegistryStorage) Delete(id string) (<-chan runtime.Object, error) {
exists, err := rs.registry.Contains(id) exists, err := rs.registry.Contains(id)
if !exists { if !exists {
return nil, ErrDoesNotExist return nil, ErrDoesNotExist
@ -72,12 +73,12 @@ func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return apiserver.MakeAsync(func() (interface{}, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(id) return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(id)
}), nil }), nil
} }
func (rs *RegistryStorage) Get(id string) (interface{}, error) { func (rs *RegistryStorage) Get(id string) (runtime.Object, error) {
exists, err := rs.registry.Contains(id) exists, err := rs.registry.Contains(id)
if !exists { if !exists {
return nil, ErrDoesNotExist return nil, ErrDoesNotExist
@ -85,26 +86,26 @@ func (rs *RegistryStorage) Get(id string) (interface{}, error) {
return rs.toApiMinion(id), err return rs.toApiMinion(id), err
} }
func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { func (rs *RegistryStorage) List(selector labels.Selector) (runtime.Object, error) {
nameList, err := rs.registry.List() nameList, err := rs.registry.List()
if err != nil { if err != nil {
return nil, err return nil, err
} }
var list api.MinionList var list api.MinionList
for _, name := range nameList { for _, name := range nameList {
list.Items = append(list.Items, rs.toApiMinion(name)) list.Items = append(list.Items, *rs.toApiMinion(name))
} }
return list, nil return &list, nil
} }
func (rs RegistryStorage) New() interface{} { func (rs RegistryStorage) New() runtime.Object {
return &api.Minion{} return &api.Minion{}
} }
func (rs *RegistryStorage) Update(minion interface{}) (<-chan interface{}, error) { func (rs *RegistryStorage) Update(minion runtime.Object) (<-chan runtime.Object, error) {
return nil, fmt.Errorf("Minions can only be created (inserted) and deleted.") return nil, fmt.Errorf("Minions can only be created (inserted) and deleted.")
} }
func (rs *RegistryStorage) toApiMinion(name string) api.Minion { func (rs *RegistryStorage) toApiMinion(name string) *api.Minion {
return api.Minion{JSONBase: api.JSONBase{ID: name}} return &api.Minion{JSONBase: api.JSONBase{ID: name}}
} }

View File

@ -31,9 +31,9 @@ type Registry interface {
// Get a specific pod // Get a specific pod
GetPod(podID string) (*api.Pod, error) GetPod(podID string) (*api.Pod, error)
// Create a pod based on a specification. // Create a pod based on a specification.
CreatePod(pod api.Pod) error CreatePod(pod *api.Pod) error
// Update an existing pod // Update an existing pod
UpdatePod(pod api.Pod) error UpdatePod(pod *api.Pod) error
// Delete an existing pod // Delete an existing pod
DeletePod(podID string) error DeletePod(podID string) error
} }

View File

@ -29,6 +29,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@ -64,7 +65,7 @@ func NewRegistryStorage(config *RegistryStorageConfig) apiserver.RESTStorage {
} }
} }
func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { func (rs *RegistryStorage) Create(obj runtime.Object) (<-chan runtime.Object, error) {
pod := obj.(*api.Pod) pod := obj.(*api.Pod)
if len(pod.ID) == 0 { if len(pod.ID) == 0 {
pod.ID = uuid.NewUUID().String() pod.ID = uuid.NewUUID().String()
@ -76,21 +77,21 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
pod.CreationTimestamp = util.Now() pod.CreationTimestamp = util.Now()
return apiserver.MakeAsync(func() (interface{}, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
if err := rs.registry.CreatePod(*pod); err != nil { if err := rs.registry.CreatePod(pod); err != nil {
return nil, err return nil, err
} }
return rs.registry.GetPod(pod.ID) return rs.registry.GetPod(pod.ID)
}), nil }), nil
} }
func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { func (rs *RegistryStorage) Delete(id string) (<-chan runtime.Object, error) {
return apiserver.MakeAsync(func() (interface{}, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(id) return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(id)
}), nil }), nil
} }
func (rs *RegistryStorage) Get(id string) (interface{}, error) { func (rs *RegistryStorage) Get(id string) (runtime.Object, error) {
pod, err := rs.registry.GetPod(id) pod, err := rs.registry.GetPod(id)
if err != nil { if err != nil {
return pod, err return pod, err
@ -106,7 +107,7 @@ func (rs *RegistryStorage) Get(id string) (interface{}, error) {
return pod, err return pod, err
} }
func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { func (rs *RegistryStorage) List(selector labels.Selector) (runtime.Object, error) {
pods, err := rs.registry.ListPods(selector) pods, err := rs.registry.ListPods(selector)
if err == nil { if err == nil {
for i := range pods.Items { for i := range pods.Items {
@ -131,17 +132,17 @@ func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion u
}) })
} }
func (rs RegistryStorage) New() interface{} { func (rs RegistryStorage) New() runtime.Object {
return &api.Pod{} return &api.Pod{}
} }
func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { func (rs *RegistryStorage) Update(obj runtime.Object) (<-chan runtime.Object, error) {
pod := obj.(*api.Pod) pod := obj.(*api.Pod)
if errs := validation.ValidatePod(pod); len(errs) > 0 { if errs := validation.ValidatePod(pod); len(errs) > 0 {
return nil, errors.NewInvalid("pod", pod.ID, errs) return nil, errors.NewInvalid("pod", pod.ID, errs)
} }
return apiserver.MakeAsync(func() (interface{}, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
if err := rs.registry.UpdatePod(*pod); err != nil { if err := rs.registry.UpdatePod(pod); err != nil {
return nil, err return nil, err
} }
return rs.registry.GetPod(pod.ID) return rs.registry.GetPod(pod.ID)
@ -235,7 +236,7 @@ func getPodStatus(pod *api.Pod) api.PodStatus {
} }
} }
func (rs *RegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, error) { func (rs *RegistryStorage) waitForPodRunning(pod *api.Pod) (runtime.Object, error) {
for { for {
podObj, err := rs.Get(pod.ID) podObj, err := rs.Get(pod.ID)
if err != nil || podObj == nil { if err != nil || podObj == nil {

View File

@ -26,10 +26,10 @@ import (
// Registry is an interface for things that know how to store services. // Registry is an interface for things that know how to store services.
type Registry interface { type Registry interface {
ListServices() (*api.ServiceList, error) ListServices() (*api.ServiceList, error)
CreateService(svc api.Service) error CreateService(svc *api.Service) error
GetService(name string) (*api.Service, error) GetService(name string) (*api.Service, error)
DeleteService(name string) error DeleteService(name string) error
UpdateService(svc api.Service) error UpdateService(svc *api.Service) error
WatchServices(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) WatchServices(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error)
// TODO: endpoints and their implementation should be separated, setting endpoints should be // TODO: endpoints and their implementation should be separated, setting endpoints should be

View File

@ -29,6 +29,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
@ -49,7 +50,7 @@ func NewRegistryStorage(registry Registry, cloud cloudprovider.Interface, machin
} }
} }
func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { func (rs *RegistryStorage) Create(obj runtime.Object) (<-chan runtime.Object, error) {
srv := obj.(*api.Service) srv := obj.(*api.Service)
if errs := validation.ValidateService(srv); len(errs) > 0 { if errs := validation.ValidateService(srv); len(errs) > 0 {
return nil, errors.NewInvalid("service", srv.ID, errs) return nil, errors.NewInvalid("service", srv.ID, errs)
@ -57,7 +58,7 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
srv.CreationTimestamp = util.Now() srv.CreationTimestamp = util.Now()
return apiserver.MakeAsync(func() (interface{}, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
// TODO: Consider moving this to a rectification loop, so that we make/remove external load balancers // TODO: Consider moving this to a rectification loop, so that we make/remove external load balancers
// correctly no matter what http operations happen. // correctly no matter what http operations happen.
if srv.CreateExternalLoadBalancer { if srv.CreateExternalLoadBalancer {
@ -85,7 +86,7 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
return nil, err return nil, err
} }
} }
err := rs.registry.CreateService(*srv) err := rs.registry.CreateService(srv)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -93,18 +94,18 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
}), nil }), nil
} }
func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { func (rs *RegistryStorage) Delete(id string) (<-chan runtime.Object, error) {
service, err := rs.registry.GetService(id) service, err := rs.registry.GetService(id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return apiserver.MakeAsync(func() (interface{}, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
rs.deleteExternalLoadBalancer(service) rs.deleteExternalLoadBalancer(service)
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(id) return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(id)
}), nil }), nil
} }
func (rs *RegistryStorage) Get(id string) (interface{}, error) { func (rs *RegistryStorage) Get(id string) (runtime.Object, error) {
s, err := rs.registry.GetService(id) s, err := rs.registry.GetService(id)
if err != nil { if err != nil {
return nil, err return nil, err
@ -112,7 +113,7 @@ func (rs *RegistryStorage) Get(id string) (interface{}, error) {
return s, err return s, err
} }
func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { func (rs *RegistryStorage) List(selector labels.Selector) (runtime.Object, error) {
list, err := rs.registry.ListServices() list, err := rs.registry.ListServices()
if err != nil { if err != nil {
return nil, err return nil, err
@ -133,7 +134,7 @@ func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion u
return rs.registry.WatchServices(label, field, resourceVersion) return rs.registry.WatchServices(label, field, resourceVersion)
} }
func (rs RegistryStorage) New() interface{} { func (rs RegistryStorage) New() runtime.Object {
return &api.Service{} return &api.Service{}
} }
@ -155,14 +156,14 @@ func GetServiceEnvironmentVariables(registry Registry, machine string) ([]api.En
return result, nil return result, nil
} }
func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { func (rs *RegistryStorage) Update(obj runtime.Object) (<-chan runtime.Object, error) {
srv := obj.(*api.Service) srv := obj.(*api.Service)
if errs := validation.ValidateService(srv); len(errs) > 0 { if errs := validation.ValidateService(srv); len(errs) > 0 {
return nil, errors.NewInvalid("service", srv.ID, errs) return nil, errors.NewInvalid("service", srv.ID, errs)
} }
return apiserver.MakeAsync(func() (interface{}, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
// TODO: check to see if external load balancer status changed // TODO: check to see if external load balancer status changed
err := rs.registry.UpdateService(*srv) err := rs.registry.UpdateService(srv)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -20,15 +20,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
// All api types must support the Object interface. It's deliberately tiny so that this is not an onerous
// burden. Implement it with a pointer reciever; this will allow us to use the go compiler to check the
// one thing about our objects that it's capable of checking for us.
type Object interface {
// This function is used only to enforce membership. It's never called.
// TODO: Consider mass rename in the future to make it do something useful.
IsAnAPIObject()
}
// Note that the types provided in this file are not versioned and are intended to be // Note that the types provided in this file are not versioned and are intended to be
// safe to use from within all versions of every API object. // safe to use from within all versions of every API object.

View File

@ -73,7 +73,7 @@ func (e *EndpointController) SyncServiceEndpoints() error {
endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port)) endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port))
} }
// TODO: this is totally broken, we need to compute this and store inside an AtomicUpdate loop. // TODO: this is totally broken, we need to compute this and store inside an AtomicUpdate loop.
err = e.serviceRegistry.UpdateEndpoints(api.Endpoints{ err = e.serviceRegistry.UpdateEndpoints(&api.Endpoints{
JSONBase: api.JSONBase{ID: service.ID}, JSONBase: api.JSONBase{ID: service.ID},
Endpoints: endpoints, Endpoints: endpoints,
}) })

View File

@ -22,6 +22,7 @@ import (
"io" "io"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
@ -42,7 +43,7 @@ func NewAPIEventDecoder(stream io.ReadCloser) *APIEventDecoder {
// Decode blocks until it can return the next object in the stream. Returns an error // Decode blocks until it can return the next object in the stream. Returns an error
// if the stream is closed or an object can't be decoded. // if the stream is closed or an object can't be decoded.
func (d *APIEventDecoder) Decode() (action watch.EventType, object interface{}, err error) { func (d *APIEventDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
var got api.WatchEvent var got api.WatchEvent
err = d.decoder.Decode(&got) err = d.decoder.Decode(&got)
if err != nil { if err != nil {

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
) )
@ -38,19 +39,6 @@ var (
EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired} EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired}
) )
// Codec provides methods for transforming Etcd values into objects and back.
type Codec interface {
Encode(obj interface{}) (data []byte, err error)
Decode(data []byte) (interface{}, error)
DecodeInto(data []byte, obj interface{}) error
}
// ResourceVersioner provides methods for managing object modification tracking.
type ResourceVersioner interface {
SetResourceVersion(obj interface{}, version uint64) error
ResourceVersion(obj interface{}) (uint64, error)
}
// EtcdClient is an injectable interface for testing. // EtcdClient is an injectable interface for testing.
type EtcdClient interface { type EtcdClient interface {
AddChild(key, data string, ttl uint64) (*etcd.Response, error) AddChild(key, data string, ttl uint64) (*etcd.Response, error)
@ -77,9 +65,9 @@ type EtcdGetSet interface {
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client. // EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
type EtcdHelper struct { type EtcdHelper struct {
Client EtcdGetSet Client EtcdGetSet
Codec Codec Codec runtime.Codec
// optional, no atomic operations can be performed without this interface // optional, no atomic operations can be performed without this interface
ResourceVersioner ResourceVersioner ResourceVersioner runtime.ResourceVersioner
} }
// IsEtcdNotFound returns true iff err is an etcd not found error. // IsEtcdNotFound returns true iff err is an etcd not found error.
@ -151,9 +139,9 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}, resourceVersi
v := pv.Elem() v := pv.Elem()
for _, node := range nodes { for _, node := range nodes {
obj := reflect.New(v.Type().Elem()) obj := reflect.New(v.Type().Elem())
err = h.Codec.DecodeInto([]byte(node.Value), obj.Interface()) err = h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object))
if h.ResourceVersioner != nil { if h.ResourceVersioner != nil {
_ = h.ResourceVersioner.SetResourceVersion(obj.Interface(), node.ModifiedIndex) _ = h.ResourceVersioner.SetResourceVersion(obj.Interface().(runtime.Object), node.ModifiedIndex)
// being unable to set the version does not prevent the object from being extracted // being unable to set the version does not prevent the object from being extracted
} }
if err != nil { if err != nil {
@ -167,12 +155,12 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}, resourceVersi
// ExtractObj unmarshals json found at key into objPtr. On a not found error, will either return // ExtractObj unmarshals json found at key into objPtr. On a not found error, will either return
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats // a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
// empty responses and nil response nodes exactly like a not found error. // empty responses and nil response nodes exactly like a not found error.
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error { func (h *EtcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error {
_, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
return err return err
} }
func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) { func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) {
response, err := h.Client.Get(key, false, false) response, err := h.Client.Get(key, false, false)
if err != nil && !IsEtcdNotFound(err) { if err != nil && !IsEtcdNotFound(err) {
@ -198,7 +186,7 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot
} }
// CreateObj adds a new object at a key unless it already exists. // CreateObj adds a new object at a key unless it already exists.
func (h *EtcdHelper) CreateObj(key string, obj interface{}) error { func (h *EtcdHelper) CreateObj(key string, obj runtime.Object) error {
data, err := h.Codec.Encode(obj) data, err := h.Codec.Encode(obj)
if err != nil { if err != nil {
return err return err
@ -221,7 +209,7 @@ func (h *EtcdHelper) Delete(key string, recursive bool) error {
// SetObj marshals obj via json, and stores under key. Will do an // SetObj marshals obj via json, and stores under key. Will do an
// atomic update if obj's ResourceVersion field is set. // atomic update if obj's ResourceVersion field is set.
func (h *EtcdHelper) SetObj(key string, obj interface{}) error { func (h *EtcdHelper) SetObj(key string, obj runtime.Object) error {
data, err := h.Codec.Encode(obj) data, err := h.Codec.Encode(obj)
if err != nil { if err != nil {
return err return err
@ -240,7 +228,7 @@ func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
// Pass an EtcdUpdateFunc to EtcdHelper.AtomicUpdate to make an atomic etcd update. // Pass an EtcdUpdateFunc to EtcdHelper.AtomicUpdate to make an atomic etcd update.
// See the comment for AtomicUpdate for more detail. // See the comment for AtomicUpdate for more detail.
type EtcdUpdateFunc func(input interface{}) (output interface{}, err error) type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error)
// AtomicUpdate generalizes the pattern that allows for making atomic updates to etcd objects. // AtomicUpdate generalizes the pattern that allows for making atomic updates to etcd objects.
// Note, tryUpdate may be called more than once. // Note, tryUpdate may be called more than once.
@ -248,7 +236,7 @@ type EtcdUpdateFunc func(input interface{}) (output interface{}, err error)
// Example: // Example:
// //
// h := &util.EtcdHelper{client, encoding, versioning} // h := &util.EtcdHelper{client, encoding, versioning}
// err := h.AtomicUpdate("myKey", &MyType{}, func(input interface{}) (interface{}, error) { // err := h.AtomicUpdate("myKey", &MyType{}, func(input runtime.Object) (runtime.Object, error) {
// // Before this function is called, currentObj has been reset to etcd's current // // Before this function is called, currentObj has been reset to etcd's current
// // contents for "myKey". // // contents for "myKey".
// //
@ -261,14 +249,14 @@ type EtcdUpdateFunc func(input interface{}) (output interface{}, err error)
// return cur, nil // return cur, nil
// }) // })
// //
func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate EtcdUpdateFunc) error { func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, tryUpdate EtcdUpdateFunc) error {
pt := reflect.TypeOf(ptrToType) pt := reflect.TypeOf(ptrToType)
if pt.Kind() != reflect.Ptr { if pt.Kind() != reflect.Ptr {
// Panic is appropriate, because this is a programming error. // Panic is appropriate, because this is a programming error.
panic("need ptr to type") panic("need ptr to type")
} }
for { for {
obj := reflect.New(pt.Elem()).Interface() obj := reflect.New(pt.Elem()).Interface().(runtime.Object)
origBody, index, err := h.bodyAndExtractObj(key, obj, true) origBody, index, err := h.bodyAndExtractObj(key, obj, true)
if err != nil { if err != nil {
return err return err

View File

@ -19,6 +19,7 @@ package tools
import ( import (
"sync" "sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
@ -27,10 +28,10 @@ import (
// FilterFunc is a predicate which takes an API object and returns true // FilterFunc is a predicate which takes an API object and returns true
// iff the object should remain in the set. // iff the object should remain in the set.
type FilterFunc func(obj interface{}) bool type FilterFunc func(obj runtime.Object) bool
// Everything is a FilterFunc which accepts all objects. // Everything is a FilterFunc which accepts all objects.
func Everything(interface{}) bool { func Everything(runtime.Object) bool {
return true return true
} }
@ -59,7 +60,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface,
// change or wrap the serialized etcd object. // change or wrap the serialized etcd object.
// //
// startTime := time.Now() // startTime := time.Now()
// helper.WatchAndTransform(key, version, func(input interface{}) (interface{}, error) { // helper.WatchAndTransform(key, version, func(input runtime.Object) (runtime.Object, error) {
// value := input.(TimeAwareValue) // value := input.(TimeAwareValue)
// value.Since = startTime // value.Since = startTime
// return value, nil // return value, nil
@ -72,12 +73,12 @@ func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, trans
} }
// TransformFunc attempts to convert an object to another object for use with a watcher. // TransformFunc attempts to convert an object to another object for use with a watcher.
type TransformFunc func(interface{}) (interface{}, error) type TransformFunc func(runtime.Object) (runtime.Object, error)
// etcdWatcher converts a native etcd watch to a watch.Interface. // etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct { type etcdWatcher struct {
encoding Codec encoding runtime.Codec
versioner ResourceVersioner versioner runtime.ResourceVersioner
transform TransformFunc transform TransformFunc
list bool // If we're doing a recursive watch, should be true. list bool // If we're doing a recursive watch, should be true.
@ -98,7 +99,7 @@ type etcdWatcher struct {
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform // newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates. // and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec, versioner ResourceVersioner, transform TransformFunc) *etcdWatcher { func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner runtime.ResourceVersioner, transform TransformFunc) *etcdWatcher {
w := &etcdWatcher{ w := &etcdWatcher{
encoding: encoding, encoding: encoding,
versioner: versioner, versioner: versioner,
@ -192,7 +193,7 @@ func (w *etcdWatcher) translate() {
} }
} }
func (w *etcdWatcher) decodeObject(data []byte, index uint64) (interface{}, error) { func (w *etcdWatcher) decodeObject(data []byte, index uint64) (runtime.Object, error) {
obj, err := w.encoding.Decode(data) obj, err := w.encoding.Decode(data)
if err != nil { if err != nil {
return nil, err return nil, err
@ -260,7 +261,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
} }
curObjPasses := w.filter(curObj) curObjPasses := w.filter(curObj)
oldObjPasses := false oldObjPasses := false
var oldObj interface{} var oldObj runtime.Object
if res.PrevNode != nil && res.PrevNode.Value != "" { if res.PrevNode != nil && res.PrevNode.Value != "" {
// Ignore problems reading the old object. // Ignore problems reading the old object.
if oldObj, err = w.decodeObject([]byte(res.PrevNode.Value), res.PrevNode.ModifiedIndex); err == nil { if oldObj, err = w.decodeObject([]byte(res.PrevNode.Value), res.PrevNode.ModifiedIndex); err == nil {

View File

@ -19,6 +19,7 @@ package watch
import ( import (
"sync" "sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
@ -27,7 +28,7 @@ type Decoder interface {
// Decode should return the type of event, the decoded object, or an error. // Decode should return the type of event, the decoded object, or an error.
// An error will cause StreamWatcher to call Close(). Decode should block until // An error will cause StreamWatcher to call Close(). Decode should block until
// it has data or an error occurs. // it has data or an error occurs.
Decode() (action EventType, object interface{}, err error) Decode() (action EventType, object runtime.Object, err error)
// Close should close the underlying io.Reader, signalling to the source of // Close should close the underlying io.Reader, signalling to the source of
// the stream that it is no longer being watched. Close() must cause any // the stream that it is no longer being watched. Close() must cause any

View File

@ -18,6 +18,8 @@ package watch
import ( import (
"sync" "sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
) )
// Mux distributes event notifications among any number of watchers. Every event // Mux distributes event notifications among any number of watchers. Every event
@ -88,7 +90,7 @@ func (m *Mux) closeAll() {
} }
// Action distributes the given event among all watchers. // Action distributes the given event among all watchers.
func (m *Mux) Action(action EventType, obj interface{}) { func (m *Mux) Action(action EventType, obj runtime.Object) {
m.incoming <- Event{action, obj} m.incoming <- Event{action, obj}
} }

View File

@ -22,16 +22,19 @@ import (
"testing" "testing"
) )
func TestMux(t *testing.T) { type myType struct {
type myType struct {
ID string ID string
Value string Value string
} }
func (*myType) IsAnAPIObject() {}
func TestMux(t *testing.T) {
table := []Event{ table := []Event{
{Added, myType{"foo", "hello world 1"}}, {Added, &myType{"foo", "hello world 1"}},
{Added, myType{"bar", "hello world 2"}}, {Added, &myType{"bar", "hello world 2"}},
{Modified, myType{"foo", "goodbye world 3"}}, {Modified, &myType{"foo", "goodbye world 3"}},
{Deleted, myType{"bar", "hello world 4"}}, {Deleted, &myType{"bar", "hello world 4"}},
} }
// The mux we're testing // The mux we're testing

View File

@ -18,6 +18,8 @@ package watch
import ( import (
"sync" "sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
) )
// Interface can be implemented by anything that knows how to watch and report changes. // Interface can be implemented by anything that knows how to watch and report changes.
@ -47,7 +49,7 @@ type Event struct {
// If Type == Deleted, then this is the state of the object // If Type == Deleted, then this is the state of the object
// immediately before deletion. // immediately before deletion.
Object interface{} Object runtime.Object
} }
// FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe. // FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
@ -78,21 +80,21 @@ func (f *FakeWatcher) ResultChan() <-chan Event {
} }
// Add sends an add event. // Add sends an add event.
func (f *FakeWatcher) Add(obj interface{}) { func (f *FakeWatcher) Add(obj runtime.Object) {
f.result <- Event{Added, obj} f.result <- Event{Added, obj}
} }
// Modify sends a modify event. // Modify sends a modify event.
func (f *FakeWatcher) Modify(obj interface{}) { func (f *FakeWatcher) Modify(obj runtime.Object) {
f.result <- Event{Modified, obj} f.result <- Event{Modified, obj}
} }
// Delete sends a delete event. // Delete sends a delete event.
func (f *FakeWatcher) Delete(lastValue interface{}) { func (f *FakeWatcher) Delete(lastValue runtime.Object) {
f.result <- Event{Deleted, lastValue} f.result <- Event{Deleted, lastValue}
} }
// Action sends an event of the requested type, for table-based testing. // Action sends an event of the requested type, for table-based testing.
func (f *FakeWatcher) Action(action EventType, obj interface{}) { func (f *FakeWatcher) Action(action EventType, obj runtime.Object) {
f.result <- Event{action, obj} f.result <- Event{action, obj}
} }