Adding sync pod latency metric (again).

Latency is broken down by create, update, and sync pods.

Part of #4604.
This commit is contained in:
Victor Marmol
2015-02-24 15:29:18 -08:00
parent a1b1be8eac
commit ed0f5885b5
4 changed files with 127 additions and 38 deletions

View File

@@ -26,7 +26,7 @@ import (
"github.com/golang/glog"
)
type syncPodFunType func(*api.BoundPod, dockertools.DockerContainers) error
type syncPodFnType func(*api.BoundPod, dockertools.DockerContainers) error
// TODO(wojtek-t) Add unit tests for this type.
type podWorkers struct {
@@ -35,7 +35,7 @@ type podWorkers struct {
// Tracks all running per-pod goroutines - per-pod goroutine will be
// processing updates received through its corresponding channel.
podUpdates map[types.UID]chan api.BoundPod
podUpdates map[types.UID]chan workUpdate
// Track the current state of per-pod goroutines.
// Currently all update request for a given pod coming when another
// update of this pod is being processed are ignored.
@@ -46,44 +46,55 @@ type podWorkers struct {
// This function is run to sync the desired stated of pod.
// NOTE: This function has to be thread-safe - it can be called for
// different pods at the same time.
syncPodFun syncPodFunType
syncPodFn syncPodFnType
}
func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFun syncPodFunType) *podWorkers {
type workUpdate struct {
// The pod state to reflect.
pod *api.BoundPod
// Function to call when the update is complete.
updateCompleteFn func()
}
func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType) *podWorkers {
return &podWorkers{
podUpdates: map[types.UID]chan api.BoundPod{},
podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{},
dockerCache: dockerCache,
syncPodFun: syncPodFun,
syncPodFn: syncPodFn,
}
}
func (p *podWorkers) managePodLoop(podUpdates <-chan api.BoundPod) {
for newPod := range podUpdates {
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
for newWork := range podUpdates {
// Since we use docker cache, getting current state shouldn't cause
// performance overhead on Docker. Moreover, as long as we run syncPod
// no matter if it changes anything, having an old version of "containers"
// can cause starting eunended containers.
func() {
defer p.setIsWorking(newPod.UID, false)
defer p.setIsWorking(newWork.pod.UID, false)
containers, err := p.dockerCache.RunningContainers()
if err != nil {
glog.Errorf("Error listing containers while syncing pod: %v", err)
return
}
err = p.syncPodFun(&newPod, containers)
err = p.syncPodFn(newWork.pod, containers)
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newPod.UID, err)
record.Eventf(&newPod, "failedSync", "Error syncing pod, skipping: %v", err)
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
record.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)
return
}
newWork.updateCompleteFn()
}()
}
}
func (p *podWorkers) UpdatePod(pod api.BoundPod) {
// Apply the new setting to the specified pod. updateComplete is called when the update is completed.
func (p *podWorkers) UpdatePod(pod *api.BoundPod, updateComplete func()) {
uid := pod.UID
var podUpdates chan api.BoundPod
var podUpdates chan workUpdate
var exists bool
p.podLock.Lock()
@@ -91,7 +102,7 @@ func (p *podWorkers) UpdatePod(pod api.BoundPod) {
if podUpdates, exists = p.podUpdates[uid]; !exists {
// Currently all update request for a given pod coming when another
// update of this pod is being processed are ignored.
podUpdates = make(chan api.BoundPod, 1)
podUpdates = make(chan workUpdate, 1)
p.podUpdates[uid] = podUpdates
go p.managePodLoop(podUpdates)
}
@@ -109,7 +120,10 @@ func (p *podWorkers) UpdatePod(pod api.BoundPod) {
// be resend (because we don't drop it)
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- pod
podUpdates <- workUpdate{
pod: pod,
updateCompleteFn: updateComplete,
}
}
}