Address comments; also, demonstrate one more property in test.

This commit is contained in:
Daniel Smith
2014-06-26 10:51:29 -07:00
parent c9246dc130
commit afd13edd6a
6 changed files with 69 additions and 39 deletions

View File

@@ -37,6 +37,8 @@ func init() {
MinionList{}, MinionList{},
Minion{}, Minion{},
Status{}, Status{},
ServerOpList{},
ServerOp{},
) )
} }

View File

@@ -207,3 +207,14 @@ const (
StatusFailure = "failure" StatusFailure = "failure"
StatusWorking = "working" StatusWorking = "working"
) )
// Operation information, as delivered to API clients.
type ServerOp struct {
JSONBase `yaml:",inline" json:",inline"`
}
// Operation list, as delivered to API clients.
type ServerOpList struct {
JSONBase `yaml:",inline" json:",inline"`
Items []ServerOp `yaml:"items,omitempty" json:"items,omitempty"`
}

View File

@@ -41,9 +41,14 @@ type RESTStorage interface {
Update(interface{}) (<-chan interface{}, error) Update(interface{}) (<-chan interface{}, error)
} }
// WorkFunc is used to perform any time consuming work for an api call, after
// the input has been validated. Pass one of these to MakeAsync to create an
// appropriate return value for the Update, Delete, and Create methods.
type WorkFunc func() (result interface{}, err error)
// MakeAsync takes a function and executes it, delivering the result in the way required // MakeAsync takes a function and executes it, delivering the result in the way required
// by RESTStorage's Update, Delete, and Create methods. // by RESTStorage's Update, Delete, and Create methods.
func MakeAsync(fn func() (interface{}, error)) <-chan interface{} { func MakeAsync(fn WorkFunc) <-chan interface{} {
channel := make(chan interface{}) channel := make(chan interface{})
go func() { go func() {
defer util.HandleCrash() defer util.HandleCrash()
@@ -171,7 +176,7 @@ func (server *ApiServer) finishReq(out <-chan interface{}, sync bool, timeout ti
if sync { if sync {
op.WaitFor(timeout) op.WaitFor(timeout)
} }
obj, complete := op.Describe() obj, complete := op.StatusOrResult()
if complete { if complete {
server.write(http.StatusOK, obj, w) server.write(http.StatusOK, obj, w)
} else { } else {
@@ -308,7 +313,7 @@ func (server *ApiServer) handleOperationRequest(parts []string, w http.ResponseW
server.notFound(req, w) server.notFound(req, w)
} }
obj, complete := op.Describe() obj, complete := op.StatusOrResult()
if complete { if complete {
server.write(http.StatusOK, obj, w) server.write(http.StatusOK, obj, w)
} else { } else {

View File

@@ -60,7 +60,8 @@ type SimpleRESTStorage struct {
updated Simple updated Simple
created Simple created Simple
// called when answering update, delete, create // If non-nil, called inside the WorkFunc when answering update, delete, create.
// obj recieves the original input to the update, delete, or create call.
injectedFunction func(obj interface{}) (returnObj interface{}, err error) injectedFunction func(obj interface{}) (returnObj interface{}, err error)
} }

View File

@@ -17,30 +17,16 @@ limitations under the License.
package apiserver package apiserver
import ( import (
"fmt"
"sort" "sort"
"strconv"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
func init() {
api.AddKnownTypes(ServerOp{}, ServerOpList{})
}
// Operation information, as delivered to API clients.
type ServerOp struct {
api.JSONBase `yaml:",inline" json:",inline"`
}
// Operation list, as delivered to API clients.
type ServerOpList struct {
api.JSONBase `yaml:",inline" json:",inline"`
Items []ServerOp `yaml:"items,omitempty" json:"items,omitempty"`
}
// Operation represents an ongoing action which the server is performing. // Operation represents an ongoing action which the server is performing.
type Operation struct { type Operation struct {
ID string ID string
@@ -53,9 +39,12 @@ type Operation struct {
// Operations tracks all the ongoing operations. // Operations tracks all the ongoing operations.
type Operations struct { type Operations struct {
// Access only using functions from atomic.
lastID int64
// 'lock' guards the ops map.
lock sync.Mutex lock sync.Mutex
ops map[string]*Operation ops map[string]*Operation
nextID int
} }
// Returns a new Operations repository. // Returns a new Operations repository.
@@ -67,25 +56,28 @@ func NewOperations() *Operations {
return ops return ops
} }
// Add a new operation. // Add a new operation. Lock-free.
func (ops *Operations) NewOperation(from <-chan interface{}) *Operation { func (ops *Operations) NewOperation(from <-chan interface{}) *Operation {
ops.lock.Lock() id := atomic.AddInt64(&ops.lastID, 1)
defer ops.lock.Unlock()
id := fmt.Sprintf("%v", ops.nextID)
ops.nextID++
op := &Operation{ op := &Operation{
ID: id, ID: strconv.FormatInt(id, 10),
awaiting: from, awaiting: from,
notify: make(chan bool, 1), notify: make(chan bool, 1),
} }
go op.wait() go op.wait()
ops.ops[id] = op go ops.insert(op)
return op return op
} }
// Inserts op into the ops map.
func (ops *Operations) insert(op *Operation) {
ops.lock.Lock()
defer ops.lock.Unlock()
ops.ops[op.ID] = op
}
// List operations for an API client. // List operations for an API client.
func (ops *Operations) List() ServerOpList { func (ops *Operations) List() api.ServerOpList {
ops.lock.Lock() ops.lock.Lock()
defer ops.lock.Unlock() defer ops.lock.Unlock()
@@ -94,9 +86,9 @@ func (ops *Operations) List() ServerOpList {
ids = append(ids, id) ids = append(ids, id)
} }
sort.StringSlice(ids).Sort() sort.StringSlice(ids).Sort()
ol := ServerOpList{} ol := api.ServerOpList{}
for _, id := range ids { for _, id := range ids {
ol.Items = append(ol.Items, ServerOp{JSONBase: api.JSONBase{ID: id}}) ol.Items = append(ol.Items, api.ServerOp{JSONBase: api.JSONBase{ID: id}})
} }
return ol return ol
} }
@@ -124,7 +116,9 @@ func (ops *Operations) expire(maxAge time.Duration) {
// Waits forever for the operation to complete; call via go when // Waits forever for the operation to complete; call via go when
// the operation is created. Sets op.finished when the operation // the operation is created. Sets op.finished when the operation
// does complete. Does not keep op locked while waiting. // does complete, and sends on the notify channel, in case there
// are any WaitFor() calls in progress.
// Does not keep op locked while waiting.
func (op *Operation) wait() { func (op *Operation) wait() {
defer util.HandleCrash() defer util.HandleCrash()
result := <-op.awaiting result := <-op.awaiting
@@ -161,7 +155,7 @@ func (op *Operation) expired(limitTime time.Time) bool {
// Return status information or the result of the operation if it is complete, // Return status information or the result of the operation if it is complete,
// with a bool indicating true in the latter case. // with a bool indicating true in the latter case.
func (op *Operation) Describe() (description interface{}, finished bool) { func (op *Operation) StatusOrResult() (description interface{}, finished bool) {
op.lock.Lock() op.lock.Lock()
defer op.lock.Unlock() defer op.lock.Unlock()

View File

@@ -17,6 +17,7 @@ limitations under the License.
package apiserver package apiserver
import ( import (
"sync/atomic"
"testing" "testing"
"time" "time"
) )
@@ -26,6 +27,10 @@ func TestOperation(t *testing.T) {
c := make(chan interface{}) c := make(chan interface{})
op := ops.NewOperation(c) op := ops.NewOperation(c)
// Allow context switch, so that op's ID can get added to the map and Get will work.
// This is just so we can test Get. Ordinary users have no need to call Get immediately
// after calling NewOperation, because it returns the operation directly.
time.Sleep(time.Millisecond)
go func() { go func() {
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
c <- "All done" c <- "All done"
@@ -40,16 +45,28 @@ func TestOperation(t *testing.T) {
} }
op.WaitFor(10 * time.Millisecond) op.WaitFor(10 * time.Millisecond)
if _, completed := op.Describe(); completed { if _, completed := op.StatusOrResult(); completed {
t.Errorf("Unexpectedly fast completion") t.Errorf("Unexpectedly fast completion")
} }
op.WaitFor(time.Second) const waiters = 10
if _, completed := op.Describe(); !completed { var waited int32
for i := 0; i < waiters; i++ {
go func() {
op.WaitFor(time.Hour)
atomic.AddInt32(&waited, 1)
}()
}
op.WaitFor(time.Minute)
if _, completed := op.StatusOrResult(); !completed {
t.Errorf("Unexpectedly slow completion") t.Errorf("Unexpectedly slow completion")
} }
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
if waited != waiters {
t.Errorf("Multiple waiters doesn't work, only %v finished", waited)
}
if op.expired(time.Now().Add(-time.Second)) { if op.expired(time.Now().Add(-time.Second)) {
t.Errorf("Should not be expired: %#v", op) t.Errorf("Should not be expired: %#v", op)