Make GCE PD attach block on pending detach. Retry on detach/attach errors.
This commit is contained in:
@@ -37,6 +37,9 @@ type OperationManager interface {
|
||||
// Attempts to send msg to the channel associated with ID.
|
||||
// Returns an error if no associated channel exists.
|
||||
Send(id string, msg interface{}) error
|
||||
|
||||
// Returns true if an entry with the specified ID already exists.
|
||||
Exists(id string) bool
|
||||
}
|
||||
|
||||
// Returns a new instance of a channel manager.
|
||||
@@ -90,3 +93,11 @@ func (cm *operationManager) Send(id string, msg interface{}) error {
|
||||
cm.chanMap[id] <- msg
|
||||
return nil
|
||||
}
|
||||
|
||||
// Returns true if an entry with the specified ID already exists.
|
||||
func (cm *operationManager) Exists(id string) (exists bool) {
|
||||
cm.RLock()
|
||||
defer cm.RUnlock()
|
||||
_, exists = cm.chanMap[id]
|
||||
return
|
||||
}
|
||||
|
@@ -32,15 +32,10 @@ func TestStart(t *testing.T) {
|
||||
sigErr := cm.Send(chanId, testMsg)
|
||||
|
||||
// Assert
|
||||
if startErr != nil {
|
||||
t.Fatalf("Unexpected error on Start. Expected: <no error> Actual: <%v>", startErr)
|
||||
}
|
||||
if sigErr != nil {
|
||||
t.Fatalf("Unexpected error on Send. Expected: <no error> Actual: <%v>", sigErr)
|
||||
}
|
||||
if actual := <-ch; actual != testMsg {
|
||||
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg, actual)
|
||||
}
|
||||
verifyNoError(t, startErr, "Start")
|
||||
verifyNoError(t, sigErr, "Send")
|
||||
actualMsg := <-ch
|
||||
verifyMsg(t, testMsg /* expected */, actualMsg.(string) /* actual */)
|
||||
}
|
||||
|
||||
func TestStartIdExists(t *testing.T) {
|
||||
@@ -53,12 +48,8 @@ func TestStartIdExists(t *testing.T) {
|
||||
_, startErr2 := cm.Start(chanId, 1 /* bufferSize */)
|
||||
|
||||
// Assert
|
||||
if startErr1 != nil {
|
||||
t.Fatalf("Unexpected error on Start1. Expected: <no error> Actual: <%v>", startErr1)
|
||||
}
|
||||
if startErr2 == nil {
|
||||
t.Fatalf("Expected error on Start2. Expected: <id already exists error> Actual: <no error>")
|
||||
}
|
||||
verifyNoError(t, startErr1, "Start1")
|
||||
verifyError(t, startErr2, "Start2")
|
||||
}
|
||||
|
||||
func TestStartAndAdd2Chans(t *testing.T) {
|
||||
@@ -76,25 +67,14 @@ func TestStartAndAdd2Chans(t *testing.T) {
|
||||
sigErr2 := cm.Send(chanId2, testMsg2)
|
||||
|
||||
// Assert
|
||||
if startErr1 != nil {
|
||||
t.Fatalf("Unexpected error on Start1. Expected: <no error> Actual: <%v>", startErr1)
|
||||
}
|
||||
if startErr2 != nil {
|
||||
t.Fatalf("Unexpected error on Start2. Expected: <no error> Actual: <%v>", startErr2)
|
||||
}
|
||||
if sigErr1 != nil {
|
||||
t.Fatalf("Unexpected error on Send1. Expected: <no error> Actual: <%v>", sigErr1)
|
||||
}
|
||||
if sigErr2 != nil {
|
||||
t.Fatalf("Unexpected error on Send2. Expected: <no error> Actual: <%v>", sigErr2)
|
||||
}
|
||||
if actual := <-ch1; actual != testMsg1 {
|
||||
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg1, actual)
|
||||
}
|
||||
if actual := <-ch2; actual != testMsg2 {
|
||||
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg2, actual)
|
||||
}
|
||||
|
||||
verifyNoError(t, startErr1, "Start1")
|
||||
verifyNoError(t, startErr2, "Start2")
|
||||
verifyNoError(t, sigErr1, "Send1")
|
||||
verifyNoError(t, sigErr2, "Send2")
|
||||
actualMsg1 := <-ch1
|
||||
actualMsg2 := <-ch2
|
||||
verifyMsg(t, testMsg1 /* expected */, actualMsg1.(string) /* actual */)
|
||||
verifyMsg(t, testMsg2 /* expected */, actualMsg2.(string) /* actual */)
|
||||
}
|
||||
|
||||
func TestStartAndAdd2ChansAndClose(t *testing.T) {
|
||||
@@ -114,26 +94,66 @@ func TestStartAndAdd2ChansAndClose(t *testing.T) {
|
||||
sigErr3 := cm.Send(chanId1, testMsg1)
|
||||
|
||||
// Assert
|
||||
if startErr1 != nil {
|
||||
t.Fatalf("Unexpected error on Start1. Expected: <no error> Actual: <%v>", startErr1)
|
||||
}
|
||||
if startErr2 != nil {
|
||||
t.Fatalf("Unexpected error on Start2. Expected: <no error> Actual: <%v>", startErr2)
|
||||
}
|
||||
if sigErr1 != nil {
|
||||
t.Fatalf("Unexpected error on Send1. Expected: <no error> Actual: <%v>", sigErr1)
|
||||
}
|
||||
if sigErr2 != nil {
|
||||
t.Fatalf("Unexpected error on Send2. Expected: <no error> Actual: <%v>", sigErr2)
|
||||
}
|
||||
if sigErr3 == nil {
|
||||
t.Fatalf("Expected error on Send3. Expected: <error> Actual: <no error>", sigErr2)
|
||||
}
|
||||
if actual := <-ch1; actual != testMsg1 {
|
||||
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg1, actual)
|
||||
}
|
||||
if actual := <-ch2; actual != testMsg2 {
|
||||
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg2, actual)
|
||||
}
|
||||
|
||||
verifyNoError(t, startErr1, "Start1")
|
||||
verifyNoError(t, startErr2, "Start2")
|
||||
verifyNoError(t, sigErr1, "Send1")
|
||||
verifyNoError(t, sigErr2, "Send2")
|
||||
verifyError(t, sigErr3, "Send3")
|
||||
actualMsg1 := <-ch1
|
||||
actualMsg2 := <-ch2
|
||||
verifyMsg(t, testMsg1 /* expected */, actualMsg1.(string) /* actual */)
|
||||
verifyMsg(t, testMsg2 /* expected */, actualMsg2.(string) /* actual */)
|
||||
}
|
||||
|
||||
func TestExists(t *testing.T) {
|
||||
// Arrange
|
||||
cm := NewOperationManager()
|
||||
chanId1 := "testChanId1"
|
||||
chanId2 := "testChanId2"
|
||||
|
||||
// Act & Assert
|
||||
verifyExists(t, cm, chanId1, false /* expected */)
|
||||
verifyExists(t, cm, chanId2, false /* expected */)
|
||||
|
||||
_, startErr1 := cm.Start(chanId1, 1 /* bufferSize */)
|
||||
verifyNoError(t, startErr1, "Start1")
|
||||
verifyExists(t, cm, chanId1, true /* expected */)
|
||||
verifyExists(t, cm, chanId2, false /* expected */)
|
||||
|
||||
_, startErr2 := cm.Start(chanId2, 1 /* bufferSize */)
|
||||
verifyNoError(t, startErr2, "Start2")
|
||||
verifyExists(t, cm, chanId1, true /* expected */)
|
||||
verifyExists(t, cm, chanId2, true /* expected */)
|
||||
|
||||
cm.Close(chanId1)
|
||||
verifyExists(t, cm, chanId1, false /* expected */)
|
||||
verifyExists(t, cm, chanId2, true /* expected */)
|
||||
|
||||
cm.Close(chanId2)
|
||||
verifyExists(t, cm, chanId1, false /* expected */)
|
||||
verifyExists(t, cm, chanId2, false /* expected */)
|
||||
}
|
||||
|
||||
func verifyExists(t *testing.T, cm OperationManager, id string, expected bool) {
|
||||
if actual := cm.Exists(id); expected != actual {
|
||||
t.Fatalf("Unexpected Exists(%q) response. Expected: <%v> Actual: <%v>", id, expected, actual)
|
||||
}
|
||||
}
|
||||
|
||||
func verifyNoError(t *testing.T, err error, name string) {
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected response on %q. Expected: <no error> Actual: <%v>", name, err)
|
||||
}
|
||||
}
|
||||
|
||||
func verifyError(t *testing.T, err error, name string) {
|
||||
if err == nil {
|
||||
t.Fatalf("Unexpected response on %q. Expected: <error> Actual: <no error>")
|
||||
}
|
||||
}
|
||||
|
||||
func verifyMsg(t *testing.T, expected, actual string) {
|
||||
if actual != expected {
|
||||
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", expected, actual)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user