Merge pull request #1338 from mlaventure/client-reconnect-fix

Client reconnect fix
This commit is contained in:
Phil Estes 2017-08-11 14:30:05 -04:00 committed by GitHub
commit a6be9f544d
11 changed files with 300 additions and 70 deletions

View File

@ -64,9 +64,10 @@ func New(address string, opts ...ClientOpt) (*Client, error) {
gopts := []grpc.DialOption{ gopts := []grpc.DialOption{
grpc.WithBlock(), grpc.WithBlock(),
grpc.WithInsecure(), grpc.WithInsecure(),
grpc.WithTimeout(100 * time.Second), grpc.WithTimeout(60 * time.Second),
grpc.FailOnNonTempDialError(true), grpc.FailOnNonTempDialError(true),
grpc.WithDialer(dialer), grpc.WithBackoffMaxDelay(3 * time.Second),
grpc.WithDialer(Dialer),
} }
if len(copts.dialOptions) > 0 { if len(copts.dialOptions) > 0 {
gopts = copts.dialOptions gopts = copts.dialOptions
@ -78,7 +79,7 @@ func New(address string, opts ...ClientOpt) (*Client, error) {
grpc.WithStreamInterceptor(stream), grpc.WithStreamInterceptor(stream),
) )
} }
conn, err := grpc.Dial(dialAddress(address), gopts...) conn, err := grpc.Dial(DialAddress(address), gopts...)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address) return nil, errors.Wrapf(err, "failed to dial %q", address)
} }

View File

@ -10,9 +10,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"runtime" "runtime"
"syscall"
"testing" "testing"
"time"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
@ -27,6 +25,8 @@ var (
noDaemon bool noDaemon bool
noCriu bool noCriu bool
supportsCriu bool supportsCriu bool
ctrd = &daemon{}
) )
func init() { func init() {
@ -55,7 +55,6 @@ func TestMain(m *testing.M) {
supportsCriu = err == nil && !noCriu supportsCriu = err == nil && !noCriu
var ( var (
cmd *exec.Cmd
buf = bytes.NewBuffer(nil) buf = bytes.NewBuffer(nil)
ctx, cancel = testContext() ctx, cancel = testContext()
) )
@ -64,27 +63,20 @@ func TestMain(m *testing.M) {
if !noDaemon { if !noDaemon {
os.RemoveAll(defaultRoot) os.RemoveAll(defaultRoot)
// setup a new containerd daemon if !testing.Short err := ctrd.start("containerd", address, []string{
cmd = exec.Command("containerd",
"--root", defaultRoot, "--root", defaultRoot,
"--address", address,
"--log-level", "debug", "--log-level", "debug",
) }, buf, buf)
cmd.Stdout = buf if err != nil {
cmd.Stderr = buf
if err := cmd.Start(); err != nil {
cmd.Wait()
fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String()) fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String())
os.Exit(1) os.Exit(1)
} }
} }
client, err := waitForDaemonStart(ctx, address) client, err := ctrd.waitForStart(ctx)
if err != nil { if err != nil {
if cmd.Process != nil { ctrd.Kill()
cmd.Process.Kill() ctrd.Wait()
}
cmd.Wait()
fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String()) fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String())
os.Exit(1) os.Exit(1)
} }
@ -105,8 +97,8 @@ func TestMain(m *testing.M) {
// pull a seed image // pull a seed image
if runtime.GOOS != "windows" { // TODO: remove once pull is supported on windows if runtime.GOOS != "windows" { // TODO: remove once pull is supported on windows
if _, err = client.Pull(ctx, testImage, WithPullUnpack); err != nil { if _, err = client.Pull(ctx, testImage, WithPullUnpack); err != nil {
cmd.Process.Signal(syscall.SIGTERM) ctrd.Stop()
cmd.Wait() ctrd.Wait()
fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String()) fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String())
os.Exit(1) os.Exit(1)
} }
@ -126,12 +118,12 @@ func TestMain(m *testing.M) {
if !noDaemon { if !noDaemon {
// tear down the daemon and resources created // tear down the daemon and resources created
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { if err := ctrd.Stop(); err != nil {
if err := cmd.Process.Kill(); err != nil { if err := ctrd.Kill(); err != nil {
fmt.Fprintln(os.Stderr, "failed to signal containerd", err) fmt.Fprintln(os.Stderr, "failed to signal containerd", err)
} }
} }
if err := cmd.Wait(); err != nil { if err := ctrd.Wait(); err != nil {
if _, ok := err.(*exec.ExitError); !ok { if _, ok := err.(*exec.ExitError); !ok {
fmt.Fprintln(os.Stderr, "failed to wait for containerd", err) fmt.Fprintln(os.Stderr, "failed to wait for containerd", err)
} }
@ -148,28 +140,6 @@ func TestMain(m *testing.M) {
os.Exit(status) os.Exit(status)
} }
func waitForDaemonStart(ctx context.Context, address string) (*Client, error) {
var (
client *Client
serving bool
err error
)
for i := 0; i < 20; i++ {
if client == nil {
client, err = New(address)
}
if err == nil {
serving, err = client.IsServing(ctx)
if serving {
return client, nil
}
}
time.Sleep(100 * time.Millisecond)
}
return nil, fmt.Errorf("containerd did not start within 2s: %v", err)
}
func newClient(t testing.TB, address string, opts ...ClientOpt) (*Client, error) { func newClient(t testing.TB, address string, opts ...ClientOpt) (*Client, error) {
if testing.Short() { if testing.Short() {
t.Skip() t.Skip()

View File

@ -5,15 +5,70 @@ package containerd
import ( import (
"fmt" "fmt"
"net" "net"
"os"
"strings" "strings"
"syscall"
"time" "time"
"github.com/pkg/errors"
) )
func dialer(address string, timeout time.Duration) (net.Conn, error) { func isNoent(err error) bool {
address = strings.TrimPrefix(address, "unix://") if err != nil {
return net.DialTimeout("unix", address, timeout) if nerr, ok := err.(*net.OpError); ok {
if serr, ok := nerr.Err.(*os.SyscallError); ok {
if serr.Err == syscall.ENOENT {
return true
}
}
}
}
return false
} }
func dialAddress(address string) string { type dialResult struct {
c net.Conn
err error
}
func Dialer(address string, timeout time.Duration) (net.Conn, error) {
var (
stopC = make(chan struct{})
synC = make(chan *dialResult)
)
address = strings.TrimPrefix(address, "unix://")
go func() {
defer close(synC)
for {
select {
case <-stopC:
return
default:
c, err := net.DialTimeout("unix", address, timeout)
if isNoent(err) {
<-time.After(10 * time.Millisecond)
continue
}
synC <- &dialResult{c, err}
return
}
}
}()
select {
case dr := <-synC:
return dr.c, dr.err
case <-time.After(timeout):
close(stopC)
go func() {
dr := <-synC
if dr != nil {
dr.c.Close()
}
}()
return nil, errors.Errorf("dial %s: no such file or directory", address)
}
}
func DialAddress(address string) string {
return fmt.Sprintf("unix://%s", address) return fmt.Sprintf("unix://%s", address)
} }

View File

@ -7,10 +7,10 @@ import (
winio "github.com/Microsoft/go-winio" winio "github.com/Microsoft/go-winio"
) )
func dialer(address string, timeout time.Duration) (net.Conn, error) { func Dialer(address string, timeout time.Duration) (net.Conn, error) {
return winio.DialPipe(address, &timeout) return winio.DialPipe(address, &timeout)
} }
func dialAddress(address string) string { func DialAddress(address string) string {
return address return address
} }

View File

@ -11,6 +11,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/containerd/containerd"
eventsapi "github.com/containerd/containerd/api/services/events/v1" eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events" "github.com/containerd/containerd/events"
@ -80,7 +81,7 @@ func main() {
if err != nil { if err != nil {
return err return err
} }
server := grpc.NewServer() server := newServer()
e, err := connectEvents(context.GlobalString("address")) e, err := connectEvents(context.GlobalString("address"))
if err != nil { if err != nil {
return err return err
@ -171,7 +172,7 @@ func dumpStacks() {
} }
func connectEvents(address string) (eventsapi.EventsClient, error) { func connectEvents(address string) (eventsapi.EventsClient, error) {
conn, err := connect(address, dialer) conn, err := connect(address, containerd.Dialer)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address) return nil, errors.Wrapf(err, "failed to dial %q", address)
} }
@ -182,26 +183,18 @@ func connect(address string, d func(string, time.Duration) (net.Conn, error)) (*
gopts := []grpc.DialOption{ gopts := []grpc.DialOption{
grpc.WithBlock(), grpc.WithBlock(),
grpc.WithInsecure(), grpc.WithInsecure(),
grpc.WithTimeout(100 * time.Second), grpc.WithTimeout(60 * time.Second),
grpc.WithDialer(d), grpc.WithDialer(d),
grpc.FailOnNonTempDialError(true), grpc.FailOnNonTempDialError(true),
grpc.WithBackoffMaxDelay(3 * time.Second),
} }
conn, err := grpc.Dial(dialAddress(address), gopts...) conn, err := grpc.Dial(containerd.DialAddress(address), gopts...)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address) return nil, errors.Wrapf(err, "failed to dial %q", address)
} }
return conn, nil return conn, nil
} }
func dialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", address, timeout)
}
func dialAddress(address string) string {
return fmt.Sprintf("unix://%s", address)
}
type remoteEventsPublisher struct { type remoteEventsPublisher struct {
client eventsapi.EventsClient client eventsapi.EventsClient
} }

View File

@ -33,7 +33,7 @@ func setupSignals() (chan os.Signal, error) {
} }
func newServer() *grpc.Server { func newServer() *grpc.Server {
return grpc.NewServer(grpc.Creds(NewUnixSocketCredentils(0, 0))) return grpc.NewServer(grpc.Creds(NewUnixSocketCredentials(0, 0)))
} }
type unixSocketCredentials struct { type unixSocketCredentials struct {
@ -42,7 +42,7 @@ type unixSocketCredentials struct {
serverName string serverName string
} }
func NewUnixSocketCredentils(uid, gid int) credentials.TransportCredentials { func NewUnixSocketCredentials(uid, gid int) credentials.TransportCredentials {
return &unixSocketCredentials{uid, gid, "locahost"} return &unixSocketCredentials{uid, gid, "locahost"}
} }

View File

@ -4,7 +4,9 @@ package containerd
import ( import (
"context" "context"
"syscall"
"testing" "testing"
"time"
"github.com/containerd/cgroups" "github.com/containerd/cgroups"
"github.com/containerd/containerd/linux/runcopts" "github.com/containerd/containerd/linux/runcopts"
@ -175,3 +177,94 @@ func TestShimInCgroup(t *testing.T) {
<-statusC <-statusC
} }
func TestDaemonRestart(t *testing.T) {
client, err := newClient(t, address)
if err != nil {
t.Fatal(err)
}
defer client.Close()
var (
image Image
ctx, cancel = testContext()
id = t.Name()
)
defer cancel()
image, err = client.GetImage(ctx, testImage)
if err != nil {
t.Error(err)
return
}
spec, err := generateSpec(withImageConfig(ctx, image), withProcessArgs("sleep", "30"))
if err != nil {
t.Error(err)
return
}
container, err := client.NewContainer(ctx, id, WithSpec(spec), withNewSnapshot(id, image))
if err != nil {
t.Error(err)
return
}
defer container.Delete(ctx, WithSnapshotCleanup)
task, err := container.NewTask(ctx, Stdio)
if err != nil {
t.Error(err)
return
}
defer task.Delete(ctx)
synC := make(chan struct{})
statusC := make(chan uint32, 1)
go func() {
synC <- struct{}{}
status, err := task.Wait(ctx)
if err == nil {
t.Errorf(`first task.Wait() should have failed with "transport is closing"`)
}
statusC <- status
}()
<-synC
if err := task.Start(ctx); err != nil {
t.Error(err)
return
}
if err := ctrd.Restart(); err != nil {
t.Fatal(err)
}
<-statusC
serving := false
for i := 0; i < 20; i++ {
serving, err = client.IsServing(ctx)
if serving {
break
}
time.Sleep(100 * time.Millisecond)
}
if !serving {
t.Fatalf("containerd did not start within 2s: %v", err)
}
go func() {
synC <- struct{}{}
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
<-synC
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
t.Fatal(err)
}
<-statusC
}

115
daemon_test.go Normal file
View File

@ -0,0 +1,115 @@
package containerd
import (
"context"
"fmt"
"io"
"os/exec"
"sync"
"syscall"
"time"
"github.com/pkg/errors"
)
type daemon struct {
sync.Mutex
addr string
cmd *exec.Cmd
}
func (d *daemon) start(name, address string, args []string, stdout, stderr io.Writer) error {
d.Lock()
defer d.Unlock()
if d.cmd != nil {
return errors.New("daemon is already running")
}
args = append(args, []string{"--address", address}...)
cmd := exec.Command(name, args...)
cmd.Stdout = stdout
cmd.Stderr = stderr
if err := cmd.Start(); err != nil {
cmd.Wait()
return errors.Wrap(err, "failed to start daemon")
}
d.addr = address
d.cmd = cmd
return nil
}
func (d *daemon) waitForStart(ctx context.Context) (*Client, error) {
var (
client *Client
serving bool
err error
)
for i := 0; i < 20; i++ {
if client == nil {
client, err = New(d.addr)
}
if err == nil {
serving, err = client.IsServing(ctx)
if serving {
return client, nil
}
}
time.Sleep(100 * time.Millisecond)
}
return nil, fmt.Errorf("containerd did not start within 2s: %v", err)
}
func (d *daemon) Stop() error {
d.Lock()
d.Unlock()
if d.cmd == nil {
return errors.New("daemon is not running")
}
return d.cmd.Process.Signal(syscall.SIGTERM)
}
func (d *daemon) Kill() error {
d.Lock()
d.Unlock()
if d.cmd == nil {
return errors.New("daemon is not running")
}
return d.cmd.Process.Kill()
}
func (d *daemon) Wait() error {
d.Lock()
d.Unlock()
if d.cmd == nil {
return errors.New("daemon is not running")
}
return d.cmd.Wait()
}
func (d *daemon) Restart() error {
d.Lock()
d.Unlock()
if d.cmd == nil {
return errors.New("daemon is not running")
}
var err error
if err = d.cmd.Process.Signal(syscall.SIGTERM); err != nil {
return errors.Wrap(err, "failed to signal daemon")
}
d.cmd.Wait()
<-time.After(1 * time.Second)
cmd := exec.Command(d.cmd.Path, d.cmd.Args[1:]...)
cmd.Stdout = d.cmd.Stdout
cmd.Stderr = d.cmd.Stderr
if err := cmd.Start(); err != nil {
cmd.Wait()
return errors.Wrap(err, "failed to start new daemon instance")
}
d.cmd = cmd
return nil
}

View File

@ -17,10 +17,11 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
) )
func loadBundle(path, workdir, namespace string, events *events.Exchange) *bundle { func loadBundle(path, workdir, namespace, id string, events *events.Exchange) *bundle {
return &bundle{ return &bundle{
path: path, path: path,
namespace: namespace, namespace: namespace,
id: id,
events: events, events: events,
workDir: workdir, workDir: workdir,
} }

View File

@ -215,6 +215,7 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er
filepath.Join(r.state, namespace, lc.id), filepath.Join(r.state, namespace, lc.id),
filepath.Join(r.root, namespace, lc.id), filepath.Join(r.root, namespace, lc.id),
namespace, namespace,
lc.id,
r.events, r.events,
) )
if err := bundle.Delete(); err != nil { if err := bundle.Delete(); err != nil {
@ -267,7 +268,8 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
continue continue
} }
id := path.Name() id := path.Name()
bundle := loadBundle(filepath.Join(r.state, ns, id), filepath.Join(r.root, ns, id), ns, r.events) bundle := loadBundle(filepath.Join(r.state, ns, id),
filepath.Join(r.root, ns, id), ns, id, r.events)
s, err := bundle.Connect(ctx, r.remote) s, err := bundle.Connect(ctx, r.remote)
if err != nil { if err != nil {

View File

@ -220,7 +220,7 @@ func (t *task) Wait(ctx context.Context) (uint32, error) {
for { for {
evt, err := eventstream.Recv() evt, err := eventstream.Recv()
if err != nil { if err != nil {
return UnknownExitStatus, err return UnknownExitStatus, errdefs.FromGRPC(err)
} }
if typeurl.Is(evt.Event, &eventsapi.TaskExit{}) { if typeurl.Is(evt.Event, &eventsapi.TaskExit{}) {
v, err := typeurl.UnmarshalAny(evt.Event) v, err := typeurl.UnmarshalAny(evt.Event)