ttrpc: implement Close and Shutdown

This apples logic to correctly Close a server, as well as implements
graceful shutdown. This ensures that inflight requests are not
interrupted and works similar to the functionality in `net/http`.

This required a fair bit of refactoring around how the connection is
managed. The connection now has an explicit wrapper object, ensuring
that shutdown happens in a coordinated fashion, whether or not a
forceful close or graceful shutdown is called.

In addition to the above, hardening around the accept loop has been
added. We now correctly exit on non-temporary errors and debounce the
accept call when encountering repeated errors. This should address some
issues where `SIGTERM` was not honored when dropping into the accept
spin.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day
2017-11-28 17:17:54 -08:00
parent 8d40df6f6d
commit b1feeec836
3 changed files with 460 additions and 61 deletions

View File

@@ -7,6 +7,7 @@ import (
"reflect"
"strings"
"testing"
"time"
"github.com/gogo/protobuf/proto"
)
@@ -74,34 +75,26 @@ func init() {
func TestServer(t *testing.T) {
var (
ctx = context.Background()
server = NewServer()
testImpl = &testingServer{}
ctx = context.Background()
server = NewServer()
testImpl = &testingServer{}
addr, listener = newTestListener(t)
client, cleanup = newTestClient(t, addr)
tclient = newTestingClient(client)
)
registerTestingService(server, testImpl)
addr := "\x00" + t.Name()
listener, err := net.Listen("unix", addr)
if err != nil {
t.Fatal(err)
}
defer listener.Close()
defer cleanup()
registerTestingService(server, testImpl)
go server.Serve(listener)
defer server.Shutdown(ctx)
conn, err := net.Dial("unix", addr)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
client := newTestingClient(NewClient(conn))
const calls = 2
results := make(chan callResult, 2)
go roundTrip(ctx, t, client, "bar", results)
go roundTrip(ctx, t, client, "baz", results)
go roundTrip(ctx, t, tclient, "bar", results)
go roundTrip(ctx, t, tclient, "baz", results)
for i := 0; i < calls; i++ {
result := <-results
@@ -111,6 +104,140 @@ func TestServer(t *testing.T) {
}
}
func newTestClient(t *testing.T, addr string) (*Client, func()) {
conn, err := net.Dial("unix", addr)
if err != nil {
t.Fatal(err)
}
client := NewClient(conn)
return client, func() {
conn.Close()
client.Close()
}
}
func TestServerListenerClosed(t *testing.T) {
var (
server = NewServer()
_, listener = newTestListener(t)
errs = make(chan error, 1)
)
go func() {
errs <- server.Serve(listener)
}()
if err := listener.Close(); err != nil {
t.Fatal(err)
}
err := <-errs
if err == nil {
t.Fatal(err)
}
}
func TestServerShutdown(t *testing.T) {
var (
ctx = context.Background()
server = NewServer()
addr, listener = newTestListener(t)
shutdownStarted = make(chan struct{})
shutdownFinished = make(chan struct{})
errs = make(chan error, 1)
client, cleanup = newTestClient(t, addr)
_, cleanup2 = newTestClient(t, addr) // secondary connection
)
defer cleanup()
defer cleanup2()
defer server.Close()
// register a service that takes until we tell it to stop
server.Register(serviceName, map[string]Method{
"Test": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req testPayload
if err := unmarshal(&req); err != nil {
return nil, err
}
return &testPayload{Foo: "waited"}, nil
},
})
go func() {
errs <- server.Serve(listener)
}()
tp := testPayload{Foo: "half"}
// send a few half requests
if err := client.sendRequest(ctx, 1, "testService", "Test", &tp); err != nil {
t.Fatal(err)
}
if err := client.sendRequest(ctx, 3, "testService", "Test", &tp); err != nil {
t.Fatal(err)
}
time.Sleep(1 * time.Millisecond) // ensure that requests make it through before shutdown
go func() {
close(shutdownStarted)
server.Shutdown(ctx)
// server.Close()
close(shutdownFinished)
}()
<-shutdownStarted
// receive the responses
if err := client.recvResponse(ctx, 1, &tp); err != nil {
t.Fatal(err)
}
if err := client.recvResponse(ctx, 3, &tp); err != nil {
t.Fatal(err)
}
<-shutdownFinished
checkServerShutdown(t, server)
}
func TestServerClose(t *testing.T) {
var (
server = NewServer()
_, listener = newTestListener(t)
startClose = make(chan struct{})
errs = make(chan error, 1)
)
go func() {
close(startClose)
errs <- server.Serve(listener)
}()
<-startClose
if err := server.Close(); err != nil {
t.Fatal(err)
}
err := <-errs
if err != ErrServerClosed {
t.Fatal("expected an error from a closed server", err)
}
checkServerShutdown(t, server)
}
func checkServerShutdown(t *testing.T, server *Server) {
t.Helper()
server.mu.Lock()
defer server.mu.Unlock()
if len(server.listeners) > 0 {
t.Fatalf("expected listeners to be empty: %v", server.listeners)
}
if len(server.connections) > 0 {
t.Fatalf("expected connections to be empty: %v", server.connections)
}
}
type callResult struct {
input *testPayload
expected *testPayload
@@ -136,3 +263,13 @@ func roundTrip(ctx context.Context, t *testing.T, client *testingClient, value s
received: resp,
}
}
func newTestListener(t *testing.T) (string, net.Listener) {
addr := "\x00" + t.Name()
listener, err := net.Listen("unix", addr)
if err != nil {
t.Fatal(err)
}
return addr, listener
}