Retry and backoff when lost connection with containerd.
Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
parent
6e27320f40
commit
0179d0fbaf
4
Godeps/Godeps.json
generated
4
Godeps/Godeps.json
generated
@ -242,6 +242,10 @@
|
|||||||
"ImportPath": "github.com/golang/protobuf/ptypes/empty",
|
"ImportPath": "github.com/golang/protobuf/ptypes/empty",
|
||||||
"Rev": "8ee79997227bf9b34611aee7946ae64735e6fd93"
|
"Rev": "8ee79997227bf9b34611aee7946ae64735e6fd93"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "github.com/jpillora/backoff",
|
||||||
|
"Rev": "06c7a16c845dc8e0bf575fafeeca0f5462f5eb4d"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/kubernetes-incubator/cri-o/pkg/ocicni",
|
"ImportPath": "github.com/kubernetes-incubator/cri-o/pkg/ocicni",
|
||||||
"Rev": "f648cd6e60948e4da391040e5c75d8175fea4fb7"
|
"Rev": "f648cd6e60948e4da391040e5c75d8175fea4fb7"
|
||||||
|
@ -48,9 +48,7 @@ func main() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Exitf("Failed to create CRI containerd service %+v: %v", o, err)
|
glog.Exitf("Failed to create CRI containerd service %+v: %v", o, err)
|
||||||
}
|
}
|
||||||
if err := service.Start(); err != nil {
|
service.Start()
|
||||||
glog.Exitf("Failed to start CRI containerd service: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
s := server.NewCRIContainerdServer(o.SocketPath, service, service)
|
s := server.NewCRIContainerdServer(o.SocketPath, service, service)
|
||||||
if err := s.Run(); err != nil {
|
if err := s.Run(); err != nil {
|
||||||
|
@ -17,47 +17,65 @@ limitations under the License.
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/golang/glog"
|
"time"
|
||||||
"golang.org/x/net/context"
|
|
||||||
|
|
||||||
"github.com/containerd/containerd/api/services/execution"
|
"github.com/containerd/containerd/api/services/execution"
|
||||||
"github.com/containerd/containerd/api/types/container"
|
"github.com/containerd/containerd/api/types/container"
|
||||||
|
"github.com/golang/glog"
|
||||||
|
"github.com/jpillora/backoff"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
|
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// minRetryInterval is the minimum retry interval when lost connection with containerd.
|
||||||
|
minRetryInterval = 100 * time.Millisecond
|
||||||
|
// maxRetryInterval is the maximum retry interval when lost connection with containerd.
|
||||||
|
maxRetryInterval = 30 * time.Second
|
||||||
|
// exponentialFactor is the exponential backoff factor.
|
||||||
|
exponentialFactor = 2.0
|
||||||
|
)
|
||||||
|
|
||||||
// startEventMonitor starts an event monitor which monitors and handles all
|
// startEventMonitor starts an event monitor which monitors and handles all
|
||||||
// container events.
|
// container events.
|
||||||
// TODO(random-liu): [P1] Figure out:
|
// TODO(random-liu): [P1] Is it possible to drop event during containerd is running?
|
||||||
// 1) Is it possible to drop event during containerd is running?
|
func (c *criContainerdService) startEventMonitor() {
|
||||||
// 2) How to deal with containerd down? We should restart event monitor, and
|
b := backoff.Backoff{
|
||||||
// we should recover all container state.
|
Min: minRetryInterval,
|
||||||
func (c *criContainerdService) startEventMonitor() error {
|
Max: maxRetryInterval,
|
||||||
events, err := c.containerService.Events(context.Background(), &execution.EventsRequest{})
|
Factor: exponentialFactor,
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
c.handleEventStream(events)
|
events, err := c.containerService.Events(context.Background(), &execution.EventsRequest{})
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to connect to containerd event stream: %v", err)
|
||||||
|
time.Sleep(b.Duration())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Successfully connect with containerd, reset backoff.
|
||||||
|
b.Reset()
|
||||||
|
// TODO(random-liu): Relist to recover state, should prevent other operations
|
||||||
|
// until state is fully recovered.
|
||||||
|
if err := c.handleEventStream(events); err != nil {
|
||||||
|
glog.Errorf("Failed to handle event stream: %v", err)
|
||||||
|
time.Sleep(b.Duration())
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleEventStream receives an event from containerd and handles the event.
|
// handleEventStream receives an event from containerd and handles the event.
|
||||||
func (c *criContainerdService) handleEventStream(events execution.ContainerService_EventsClient) {
|
func (c *criContainerdService) handleEventStream(events execution.ContainerService_EventsClient) error {
|
||||||
// TODO(random-liu): [P1] Should backoff on this error, or else this will
|
|
||||||
// cause a busy loop.
|
|
||||||
// TODO(random-liu): Handle io.EOF.
|
|
||||||
e, err := events.Recv()
|
e, err := events.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to receive event: %v", err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
glog.V(2).Infof("Received container event: %+v", e)
|
glog.V(2).Infof("Received container event: %+v", e)
|
||||||
c.handleEvent(e)
|
c.handleEvent(e)
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleEvent handles a containerd event.
|
// handleEvent handles a containerd event.
|
||||||
|
@ -54,7 +54,7 @@ import (
|
|||||||
|
|
||||||
// CRIContainerdService is the interface implement CRI remote service server.
|
// CRIContainerdService is the interface implement CRI remote service server.
|
||||||
type CRIContainerdService interface {
|
type CRIContainerdService interface {
|
||||||
Start() error
|
Start()
|
||||||
runtime.RuntimeServiceServer
|
runtime.RuntimeServiceServer
|
||||||
runtime.ImageServiceServer
|
runtime.ImageServiceServer
|
||||||
}
|
}
|
||||||
@ -128,6 +128,6 @@ func NewCRIContainerdService(conn *grpc.ClientConn, rootDir, networkPluginBinDir
|
|||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *criContainerdService) Start() error {
|
func (c *criContainerdService) Start() {
|
||||||
return c.startEventMonitor()
|
c.startEventMonitor()
|
||||||
}
|
}
|
||||||
|
21
vendor/github.com/jpillora/backoff/LICENSE
generated
vendored
Normal file
21
vendor/github.com/jpillora/backoff/LICENSE
generated
vendored
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
The MIT License (MIT)
|
||||||
|
|
||||||
|
Copyright (c) 2017 Jaime Pillora
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all
|
||||||
|
copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
SOFTWARE.
|
119
vendor/github.com/jpillora/backoff/README.md
generated
vendored
Normal file
119
vendor/github.com/jpillora/backoff/README.md
generated
vendored
Normal file
@ -0,0 +1,119 @@
|
|||||||
|
# Backoff
|
||||||
|
|
||||||
|
A simple exponential backoff counter in Go (Golang)
|
||||||
|
|
||||||
|
[](https://godoc.org/github.com/jpillora/backoff) [](https://circleci.com/gh/jpillora/backoff)
|
||||||
|
|
||||||
|
### Install
|
||||||
|
|
||||||
|
```
|
||||||
|
$ go get -v github.com/jpillora/backoff
|
||||||
|
```
|
||||||
|
|
||||||
|
### Usage
|
||||||
|
|
||||||
|
Backoff is a `time.Duration` counter. It starts at `Min`. After every call to `Duration()` it is multiplied by `Factor`. It is capped at `Max`. It returns to `Min` on every call to `Reset()`. `Jitter` adds randomness ([see below](#example-using-jitter)). Used in conjunction with the `time` package.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
#### Simple example
|
||||||
|
|
||||||
|
``` go
|
||||||
|
|
||||||
|
b := &backoff.Backoff{
|
||||||
|
//These are the defaults
|
||||||
|
Min: 100 * time.Millisecond,
|
||||||
|
Max: 10 * time.Second,
|
||||||
|
Factor: 2,
|
||||||
|
Jitter: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("%s\n", b.Duration())
|
||||||
|
fmt.Printf("%s\n", b.Duration())
|
||||||
|
fmt.Printf("%s\n", b.Duration())
|
||||||
|
|
||||||
|
fmt.Printf("Reset!\n")
|
||||||
|
b.Reset()
|
||||||
|
|
||||||
|
fmt.Printf("%s\n", b.Duration())
|
||||||
|
```
|
||||||
|
|
||||||
|
```
|
||||||
|
100ms
|
||||||
|
200ms
|
||||||
|
400ms
|
||||||
|
Reset!
|
||||||
|
100ms
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
#### Example using `net` package
|
||||||
|
|
||||||
|
``` go
|
||||||
|
b := &backoff.Backoff{
|
||||||
|
Max: 5 * time.Minute,
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
conn, err := net.Dial("tcp", "example.com:5309")
|
||||||
|
if err != nil {
|
||||||
|
d := b.Duration()
|
||||||
|
fmt.Printf("%s, reconnecting in %s", err, d)
|
||||||
|
time.Sleep(d)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
//connected
|
||||||
|
b.Reset()
|
||||||
|
conn.Write([]byte("hello world!"))
|
||||||
|
// ... Read ... Write ... etc
|
||||||
|
conn.Close()
|
||||||
|
//disconnected
|
||||||
|
}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
#### Example using `Jitter`
|
||||||
|
|
||||||
|
Enabling `Jitter` adds some randomization to the backoff durations. [See Amazon's writeup of performance gains using jitter](http://www.awsarchitectureblog.com/2015/03/backoff.html). Seeding is not necessary but doing so gives repeatable results.
|
||||||
|
|
||||||
|
```go
|
||||||
|
import "math/rand"
|
||||||
|
|
||||||
|
b := &backoff.Backoff{
|
||||||
|
Jitter: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
rand.Seed(42)
|
||||||
|
|
||||||
|
fmt.Printf("%s\n", b.Duration())
|
||||||
|
fmt.Printf("%s\n", b.Duration())
|
||||||
|
fmt.Printf("%s\n", b.Duration())
|
||||||
|
|
||||||
|
fmt.Printf("Reset!\n")
|
||||||
|
b.Reset()
|
||||||
|
|
||||||
|
fmt.Printf("%s\n", b.Duration())
|
||||||
|
fmt.Printf("%s\n", b.Duration())
|
||||||
|
fmt.Printf("%s\n", b.Duration())
|
||||||
|
```
|
||||||
|
|
||||||
|
```
|
||||||
|
100ms
|
||||||
|
106.600049ms
|
||||||
|
281.228155ms
|
||||||
|
Reset!
|
||||||
|
100ms
|
||||||
|
104.381845ms
|
||||||
|
214.957989ms
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Documentation
|
||||||
|
|
||||||
|
https://godoc.org/github.com/jpillora/backoff
|
||||||
|
|
||||||
|
#### Credits
|
||||||
|
|
||||||
|
Forked from some JavaScript written by [@tj](https://github.com/tj)
|
88
vendor/github.com/jpillora/backoff/backoff.go
generated
vendored
Normal file
88
vendor/github.com/jpillora/backoff/backoff.go
generated
vendored
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
// Package backoff provides an exponential-backoff implementation.
|
||||||
|
package backoff
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"math/rand"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Backoff is a time.Duration counter, starting at Min. After every call to
|
||||||
|
// the Duration method the current timing is multiplied by Factor, but it
|
||||||
|
// never exceeds Max.
|
||||||
|
//
|
||||||
|
// Backoff is not generally concurrent-safe, but the ForAttempt method can
|
||||||
|
// be used concurrently.
|
||||||
|
type Backoff struct {
|
||||||
|
//Factor is the multiplying factor for each increment step
|
||||||
|
attempt, Factor float64
|
||||||
|
//Jitter eases contention by randomizing backoff steps
|
||||||
|
Jitter bool
|
||||||
|
//Min and Max are the minimum and maximum values of the counter
|
||||||
|
Min, Max time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// Duration returns the duration for the current attempt before incrementing
|
||||||
|
// the attempt counter. See ForAttempt.
|
||||||
|
func (b *Backoff) Duration() time.Duration {
|
||||||
|
d := b.ForAttempt(b.attempt)
|
||||||
|
b.attempt++
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
const maxInt64 = float64(math.MaxInt64 - 512)
|
||||||
|
|
||||||
|
// ForAttempt returns the duration for a specific attempt. This is useful if
|
||||||
|
// you have a large number of independent Backoffs, but don't want use
|
||||||
|
// unnecessary memory storing the Backoff parameters per Backoff. The first
|
||||||
|
// attempt should be 0.
|
||||||
|
//
|
||||||
|
// ForAttempt is concurrent-safe.
|
||||||
|
func (b *Backoff) ForAttempt(attempt float64) time.Duration {
|
||||||
|
// Zero-values are nonsensical, so we use
|
||||||
|
// them to apply defaults
|
||||||
|
min := b.Min
|
||||||
|
if min <= 0 {
|
||||||
|
min = 100 * time.Millisecond
|
||||||
|
}
|
||||||
|
max := b.Max
|
||||||
|
if max <= 0 {
|
||||||
|
max = 10 * time.Second
|
||||||
|
}
|
||||||
|
if min >= max {
|
||||||
|
// short-circuit
|
||||||
|
return max
|
||||||
|
}
|
||||||
|
factor := b.Factor
|
||||||
|
if factor <= 0 {
|
||||||
|
factor = 2
|
||||||
|
}
|
||||||
|
//calculate this duration
|
||||||
|
minf := float64(min)
|
||||||
|
durf := minf * math.Pow(factor, attempt)
|
||||||
|
if b.Jitter {
|
||||||
|
durf = rand.Float64()*(durf-minf) + minf
|
||||||
|
}
|
||||||
|
//ensure float64 wont overflow int64
|
||||||
|
if durf > maxInt64 {
|
||||||
|
return max
|
||||||
|
}
|
||||||
|
dur := time.Duration(durf)
|
||||||
|
//keep within bounds
|
||||||
|
if dur < min {
|
||||||
|
return min
|
||||||
|
} else if dur > max {
|
||||||
|
return max
|
||||||
|
}
|
||||||
|
return dur
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset restarts the current attempt counter at zero.
|
||||||
|
func (b *Backoff) Reset() {
|
||||||
|
b.attempt = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt returns the current attempt counter value.
|
||||||
|
func (b *Backoff) Attempt() float64 {
|
||||||
|
return b.attempt
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user