diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 4f15dbb29..9595debe9 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -242,6 +242,10 @@ "ImportPath": "github.com/golang/protobuf/ptypes/empty", "Rev": "8ee79997227bf9b34611aee7946ae64735e6fd93" }, + { + "ImportPath": "github.com/jpillora/backoff", + "Rev": "06c7a16c845dc8e0bf575fafeeca0f5462f5eb4d" + }, { "ImportPath": "github.com/kubernetes-incubator/cri-o/pkg/ocicni", "Rev": "f648cd6e60948e4da391040e5c75d8175fea4fb7" diff --git a/cmd/cri-containerd/cri_containerd.go b/cmd/cri-containerd/cri_containerd.go index d0530a540..5ab1ddcfa 100644 --- a/cmd/cri-containerd/cri_containerd.go +++ b/cmd/cri-containerd/cri_containerd.go @@ -48,9 +48,7 @@ func main() { if err != nil { glog.Exitf("Failed to create CRI containerd service %+v: %v", o, err) } - if err := service.Start(); err != nil { - glog.Exitf("Failed to start CRI containerd service: %v", err) - } + service.Start() s := server.NewCRIContainerdServer(o.SocketPath, service, service) if err := s.Run(); err != nil { diff --git a/pkg/server/events.go b/pkg/server/events.go index bb28fd04a..ba83db384 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -17,47 +17,65 @@ limitations under the License. package server import ( - "github.com/golang/glog" - "golang.org/x/net/context" + "time" "github.com/containerd/containerd/api/services/execution" "github.com/containerd/containerd/api/types/container" + "github.com/golang/glog" + "github.com/jpillora/backoff" + "golang.org/x/net/context" "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" ) +const ( + // minRetryInterval is the minimum retry interval when lost connection with containerd. + minRetryInterval = 100 * time.Millisecond + // maxRetryInterval is the maximum retry interval when lost connection with containerd. + maxRetryInterval = 30 * time.Second + // exponentialFactor is the exponential backoff factor. + exponentialFactor = 2.0 +) + // startEventMonitor starts an event monitor which monitors and handles all // container events. -// TODO(random-liu): [P1] Figure out: -// 1) Is it possible to drop event during containerd is running? -// 2) How to deal with containerd down? We should restart event monitor, and -// we should recover all container state. -func (c *criContainerdService) startEventMonitor() error { - events, err := c.containerService.Events(context.Background(), &execution.EventsRequest{}) - if err != nil { - return err +// TODO(random-liu): [P1] Is it possible to drop event during containerd is running? +func (c *criContainerdService) startEventMonitor() { + b := backoff.Backoff{ + Min: minRetryInterval, + Max: maxRetryInterval, + Factor: exponentialFactor, } go func() { for { - c.handleEventStream(events) + events, err := c.containerService.Events(context.Background(), &execution.EventsRequest{}) + if err != nil { + glog.Errorf("Failed to connect to containerd event stream: %v", err) + time.Sleep(b.Duration()) + continue + } + // Successfully connect with containerd, reset backoff. + b.Reset() + // TODO(random-liu): Relist to recover state, should prevent other operations + // until state is fully recovered. + if err := c.handleEventStream(events); err != nil { + glog.Errorf("Failed to handle event stream: %v", err) + time.Sleep(b.Duration()) + continue + } } }() - return nil } // handleEventStream receives an event from containerd and handles the event. -func (c *criContainerdService) handleEventStream(events execution.ContainerService_EventsClient) { - // TODO(random-liu): [P1] Should backoff on this error, or else this will - // cause a busy loop. - // TODO(random-liu): Handle io.EOF. +func (c *criContainerdService) handleEventStream(events execution.ContainerService_EventsClient) error { e, err := events.Recv() if err != nil { - glog.Errorf("Failed to receive event: %v", err) - return + return err } glog.V(2).Infof("Received container event: %+v", e) c.handleEvent(e) - return + return nil } // handleEvent handles a containerd event. diff --git a/pkg/server/service.go b/pkg/server/service.go index 73660d7e8..939bb809d 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -54,7 +54,7 @@ import ( // CRIContainerdService is the interface implement CRI remote service server. type CRIContainerdService interface { - Start() error + Start() runtime.RuntimeServiceServer runtime.ImageServiceServer } @@ -128,6 +128,6 @@ func NewCRIContainerdService(conn *grpc.ClientConn, rootDir, networkPluginBinDir return c, nil } -func (c *criContainerdService) Start() error { - return c.startEventMonitor() +func (c *criContainerdService) Start() { + c.startEventMonitor() } diff --git a/vendor/github.com/jpillora/backoff/LICENSE b/vendor/github.com/jpillora/backoff/LICENSE new file mode 100644 index 000000000..1cc708081 --- /dev/null +++ b/vendor/github.com/jpillora/backoff/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2017 Jaime Pillora + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/jpillora/backoff/README.md b/vendor/github.com/jpillora/backoff/README.md new file mode 100644 index 000000000..81e77cd70 --- /dev/null +++ b/vendor/github.com/jpillora/backoff/README.md @@ -0,0 +1,119 @@ +# Backoff + +A simple exponential backoff counter in Go (Golang) + +[![GoDoc](https://godoc.org/github.com/jpillora/backoff?status.svg)](https://godoc.org/github.com/jpillora/backoff) [![Circle CI](https://circleci.com/gh/jpillora/backoff.svg?style=shield)](https://circleci.com/gh/jpillora/backoff) + +### Install + +``` +$ go get -v github.com/jpillora/backoff +``` + +### Usage + +Backoff is a `time.Duration` counter. It starts at `Min`. After every call to `Duration()` it is multiplied by `Factor`. It is capped at `Max`. It returns to `Min` on every call to `Reset()`. `Jitter` adds randomness ([see below](#example-using-jitter)). Used in conjunction with the `time` package. + +--- + +#### Simple example + +``` go + +b := &backoff.Backoff{ + //These are the defaults + Min: 100 * time.Millisecond, + Max: 10 * time.Second, + Factor: 2, + Jitter: false, +} + +fmt.Printf("%s\n", b.Duration()) +fmt.Printf("%s\n", b.Duration()) +fmt.Printf("%s\n", b.Duration()) + +fmt.Printf("Reset!\n") +b.Reset() + +fmt.Printf("%s\n", b.Duration()) +``` + +``` +100ms +200ms +400ms +Reset! +100ms +``` + +--- + +#### Example using `net` package + +``` go +b := &backoff.Backoff{ + Max: 5 * time.Minute, +} + +for { + conn, err := net.Dial("tcp", "example.com:5309") + if err != nil { + d := b.Duration() + fmt.Printf("%s, reconnecting in %s", err, d) + time.Sleep(d) + continue + } + //connected + b.Reset() + conn.Write([]byte("hello world!")) + // ... Read ... Write ... etc + conn.Close() + //disconnected +} + +``` + +--- + +#### Example using `Jitter` + +Enabling `Jitter` adds some randomization to the backoff durations. [See Amazon's writeup of performance gains using jitter](http://www.awsarchitectureblog.com/2015/03/backoff.html). Seeding is not necessary but doing so gives repeatable results. + +```go +import "math/rand" + +b := &backoff.Backoff{ + Jitter: true, +} + +rand.Seed(42) + +fmt.Printf("%s\n", b.Duration()) +fmt.Printf("%s\n", b.Duration()) +fmt.Printf("%s\n", b.Duration()) + +fmt.Printf("Reset!\n") +b.Reset() + +fmt.Printf("%s\n", b.Duration()) +fmt.Printf("%s\n", b.Duration()) +fmt.Printf("%s\n", b.Duration()) +``` + +``` +100ms +106.600049ms +281.228155ms +Reset! +100ms +104.381845ms +214.957989ms +``` + +#### Documentation + +https://godoc.org/github.com/jpillora/backoff + +#### Credits + +Forked from some JavaScript written by [@tj](https://github.com/tj) \ No newline at end of file diff --git a/vendor/github.com/jpillora/backoff/backoff.go b/vendor/github.com/jpillora/backoff/backoff.go new file mode 100644 index 000000000..a50d0e956 --- /dev/null +++ b/vendor/github.com/jpillora/backoff/backoff.go @@ -0,0 +1,88 @@ +// Package backoff provides an exponential-backoff implementation. +package backoff + +import ( + "math" + "math/rand" + "time" +) + +// Backoff is a time.Duration counter, starting at Min. After every call to +// the Duration method the current timing is multiplied by Factor, but it +// never exceeds Max. +// +// Backoff is not generally concurrent-safe, but the ForAttempt method can +// be used concurrently. +type Backoff struct { + //Factor is the multiplying factor for each increment step + attempt, Factor float64 + //Jitter eases contention by randomizing backoff steps + Jitter bool + //Min and Max are the minimum and maximum values of the counter + Min, Max time.Duration +} + +// Duration returns the duration for the current attempt before incrementing +// the attempt counter. See ForAttempt. +func (b *Backoff) Duration() time.Duration { + d := b.ForAttempt(b.attempt) + b.attempt++ + return d +} + +const maxInt64 = float64(math.MaxInt64 - 512) + +// ForAttempt returns the duration for a specific attempt. This is useful if +// you have a large number of independent Backoffs, but don't want use +// unnecessary memory storing the Backoff parameters per Backoff. The first +// attempt should be 0. +// +// ForAttempt is concurrent-safe. +func (b *Backoff) ForAttempt(attempt float64) time.Duration { + // Zero-values are nonsensical, so we use + // them to apply defaults + min := b.Min + if min <= 0 { + min = 100 * time.Millisecond + } + max := b.Max + if max <= 0 { + max = 10 * time.Second + } + if min >= max { + // short-circuit + return max + } + factor := b.Factor + if factor <= 0 { + factor = 2 + } + //calculate this duration + minf := float64(min) + durf := minf * math.Pow(factor, attempt) + if b.Jitter { + durf = rand.Float64()*(durf-minf) + minf + } + //ensure float64 wont overflow int64 + if durf > maxInt64 { + return max + } + dur := time.Duration(durf) + //keep within bounds + if dur < min { + return min + } else if dur > max { + return max + } + return dur +} + +// Reset restarts the current attempt counter at zero. +func (b *Backoff) Reset() { + b.attempt = 0 +} + +// Attempt returns the current attempt counter value. +func (b *Backoff) Attempt() float64 { + return b.attempt +}