Merge pull request #59 from Random-Liu/backoff-when-lose-containerd-connection

Retry and backoff when lost connection with containerd.
This commit is contained in:
Lantao Liu 2017-05-31 14:53:02 -07:00 committed by GitHub
commit 61cee6cf12
7 changed files with 273 additions and 25 deletions

4
Godeps/Godeps.json generated
View File

@ -242,6 +242,10 @@
"ImportPath": "github.com/golang/protobuf/ptypes/empty",
"Rev": "8ee79997227bf9b34611aee7946ae64735e6fd93"
},
{
"ImportPath": "github.com/jpillora/backoff",
"Rev": "06c7a16c845dc8e0bf575fafeeca0f5462f5eb4d"
},
{
"ImportPath": "github.com/kubernetes-incubator/cri-o/pkg/ocicni",
"Rev": "f648cd6e60948e4da391040e5c75d8175fea4fb7"

View File

@ -48,9 +48,7 @@ func main() {
if err != nil {
glog.Exitf("Failed to create CRI containerd service %+v: %v", o, err)
}
if err := service.Start(); err != nil {
glog.Exitf("Failed to start CRI containerd service: %v", err)
}
service.Start()
s := server.NewCRIContainerdServer(o.SocketPath, service, service)
if err := s.Run(); err != nil {

View File

@ -17,47 +17,65 @@ limitations under the License.
package server
import (
"github.com/golang/glog"
"golang.org/x/net/context"
"time"
"github.com/containerd/containerd/api/services/execution"
"github.com/containerd/containerd/api/types/container"
"github.com/golang/glog"
"github.com/jpillora/backoff"
"golang.org/x/net/context"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
)
const (
// minRetryInterval is the minimum retry interval when lost connection with containerd.
minRetryInterval = 100 * time.Millisecond
// maxRetryInterval is the maximum retry interval when lost connection with containerd.
maxRetryInterval = 30 * time.Second
// exponentialFactor is the exponential backoff factor.
exponentialFactor = 2.0
)
// startEventMonitor starts an event monitor which monitors and handles all
// container events.
// TODO(random-liu): [P1] Figure out:
// 1) Is it possible to drop event during containerd is running?
// 2) How to deal with containerd down? We should restart event monitor, and
// we should recover all container state.
func (c *criContainerdService) startEventMonitor() error {
events, err := c.containerService.Events(context.Background(), &execution.EventsRequest{})
if err != nil {
return err
// TODO(random-liu): [P1] Is it possible to drop event during containerd is running?
func (c *criContainerdService) startEventMonitor() {
b := backoff.Backoff{
Min: minRetryInterval,
Max: maxRetryInterval,
Factor: exponentialFactor,
}
go func() {
for {
c.handleEventStream(events)
events, err := c.containerService.Events(context.Background(), &execution.EventsRequest{})
if err != nil {
glog.Errorf("Failed to connect to containerd event stream: %v", err)
time.Sleep(b.Duration())
continue
}
// Successfully connect with containerd, reset backoff.
b.Reset()
// TODO(random-liu): Relist to recover state, should prevent other operations
// until state is fully recovered.
if err := c.handleEventStream(events); err != nil {
glog.Errorf("Failed to handle event stream: %v", err)
time.Sleep(b.Duration())
continue
}
}
}()
return nil
}
// handleEventStream receives an event from containerd and handles the event.
func (c *criContainerdService) handleEventStream(events execution.ContainerService_EventsClient) {
// TODO(random-liu): [P1] Should backoff on this error, or else this will
// cause a busy loop.
// TODO(random-liu): Handle io.EOF.
func (c *criContainerdService) handleEventStream(events execution.ContainerService_EventsClient) error {
e, err := events.Recv()
if err != nil {
glog.Errorf("Failed to receive event: %v", err)
return
return err
}
glog.V(2).Infof("Received container event: %+v", e)
c.handleEvent(e)
return
return nil
}
// handleEvent handles a containerd event.

View File

@ -54,7 +54,7 @@ import (
// CRIContainerdService is the interface implement CRI remote service server.
type CRIContainerdService interface {
Start() error
Start()
runtime.RuntimeServiceServer
runtime.ImageServiceServer
}
@ -128,6 +128,6 @@ func NewCRIContainerdService(conn *grpc.ClientConn, rootDir, networkPluginBinDir
return c, nil
}
func (c *criContainerdService) Start() error {
return c.startEventMonitor()
func (c *criContainerdService) Start() {
c.startEventMonitor()
}

21
vendor/github.com/jpillora/backoff/LICENSE generated vendored Normal file
View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2017 Jaime Pillora
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

119
vendor/github.com/jpillora/backoff/README.md generated vendored Normal file
View File

@ -0,0 +1,119 @@
# Backoff
A simple exponential backoff counter in Go (Golang)
[![GoDoc](https://godoc.org/github.com/jpillora/backoff?status.svg)](https://godoc.org/github.com/jpillora/backoff) [![Circle CI](https://circleci.com/gh/jpillora/backoff.svg?style=shield)](https://circleci.com/gh/jpillora/backoff)
### Install
```
$ go get -v github.com/jpillora/backoff
```
### Usage
Backoff is a `time.Duration` counter. It starts at `Min`. After every call to `Duration()` it is multiplied by `Factor`. It is capped at `Max`. It returns to `Min` on every call to `Reset()`. `Jitter` adds randomness ([see below](#example-using-jitter)). Used in conjunction with the `time` package.
---
#### Simple example
``` go
b := &backoff.Backoff{
//These are the defaults
Min: 100 * time.Millisecond,
Max: 10 * time.Second,
Factor: 2,
Jitter: false,
}
fmt.Printf("%s\n", b.Duration())
fmt.Printf("%s\n", b.Duration())
fmt.Printf("%s\n", b.Duration())
fmt.Printf("Reset!\n")
b.Reset()
fmt.Printf("%s\n", b.Duration())
```
```
100ms
200ms
400ms
Reset!
100ms
```
---
#### Example using `net` package
``` go
b := &backoff.Backoff{
Max: 5 * time.Minute,
}
for {
conn, err := net.Dial("tcp", "example.com:5309")
if err != nil {
d := b.Duration()
fmt.Printf("%s, reconnecting in %s", err, d)
time.Sleep(d)
continue
}
//connected
b.Reset()
conn.Write([]byte("hello world!"))
// ... Read ... Write ... etc
conn.Close()
//disconnected
}
```
---
#### Example using `Jitter`
Enabling `Jitter` adds some randomization to the backoff durations. [See Amazon's writeup of performance gains using jitter](http://www.awsarchitectureblog.com/2015/03/backoff.html). Seeding is not necessary but doing so gives repeatable results.
```go
import "math/rand"
b := &backoff.Backoff{
Jitter: true,
}
rand.Seed(42)
fmt.Printf("%s\n", b.Duration())
fmt.Printf("%s\n", b.Duration())
fmt.Printf("%s\n", b.Duration())
fmt.Printf("Reset!\n")
b.Reset()
fmt.Printf("%s\n", b.Duration())
fmt.Printf("%s\n", b.Duration())
fmt.Printf("%s\n", b.Duration())
```
```
100ms
106.600049ms
281.228155ms
Reset!
100ms
104.381845ms
214.957989ms
```
#### Documentation
https://godoc.org/github.com/jpillora/backoff
#### Credits
Forked from some JavaScript written by [@tj](https://github.com/tj)

88
vendor/github.com/jpillora/backoff/backoff.go generated vendored Normal file
View File

@ -0,0 +1,88 @@
// Package backoff provides an exponential-backoff implementation.
package backoff
import (
"math"
"math/rand"
"time"
)
// Backoff is a time.Duration counter, starting at Min. After every call to
// the Duration method the current timing is multiplied by Factor, but it
// never exceeds Max.
//
// Backoff is not generally concurrent-safe, but the ForAttempt method can
// be used concurrently.
type Backoff struct {
//Factor is the multiplying factor for each increment step
attempt, Factor float64
//Jitter eases contention by randomizing backoff steps
Jitter bool
//Min and Max are the minimum and maximum values of the counter
Min, Max time.Duration
}
// Duration returns the duration for the current attempt before incrementing
// the attempt counter. See ForAttempt.
func (b *Backoff) Duration() time.Duration {
d := b.ForAttempt(b.attempt)
b.attempt++
return d
}
const maxInt64 = float64(math.MaxInt64 - 512)
// ForAttempt returns the duration for a specific attempt. This is useful if
// you have a large number of independent Backoffs, but don't want use
// unnecessary memory storing the Backoff parameters per Backoff. The first
// attempt should be 0.
//
// ForAttempt is concurrent-safe.
func (b *Backoff) ForAttempt(attempt float64) time.Duration {
// Zero-values are nonsensical, so we use
// them to apply defaults
min := b.Min
if min <= 0 {
min = 100 * time.Millisecond
}
max := b.Max
if max <= 0 {
max = 10 * time.Second
}
if min >= max {
// short-circuit
return max
}
factor := b.Factor
if factor <= 0 {
factor = 2
}
//calculate this duration
minf := float64(min)
durf := minf * math.Pow(factor, attempt)
if b.Jitter {
durf = rand.Float64()*(durf-minf) + minf
}
//ensure float64 wont overflow int64
if durf > maxInt64 {
return max
}
dur := time.Duration(durf)
//keep within bounds
if dur < min {
return min
} else if dur > max {
return max
}
return dur
}
// Reset restarts the current attempt counter at zero.
func (b *Backoff) Reset() {
b.attempt = 0
}
// Attempt returns the current attempt counter value.
func (b *Backoff) Attempt() float64 {
return b.attempt
}