rkt: Implement pod FinishedAt

This is implemented via touching a file on stop as a hook in the systemd
unit. The ctime of this file is then used to get the `finishedAt` time
in the future.
In addition, this changes the `startedAt` and `createdAt` to use the api
server's results rather than the annotations it previously used.

It's possible we might want to move this into the api in the future.

Fixes #23887
This commit is contained in:
Euan Kemp
2016-04-14 18:01:40 -07:00
parent 82f3ec14fb
commit a6718f5969
9 changed files with 306 additions and 85 deletions

View File

@@ -25,6 +25,7 @@ import (
"os"
"os/exec"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
@@ -76,13 +77,11 @@ const (
unitRktID = "RktID"
unitRestartCount = "RestartCount"
k8sRktKubeletAnno = "rkt.kubernetes.io/managed-by-kubelet"
k8sRktKubeletAnnoValue = "true"
k8sRktUIDAnno = "rkt.kubernetes.io/uid"
k8sRktNameAnno = "rkt.kubernetes.io/name"
k8sRktNamespaceAnno = "rkt.kubernetes.io/namespace"
//TODO: remove the creation time annotation once this is closed: https://github.com/coreos/rkt/issues/1789
k8sRktCreationTimeAnno = "rkt.kubernetes.io/created"
k8sRktKubeletAnno = "rkt.kubernetes.io/managed-by-kubelet"
k8sRktKubeletAnnoValue = "true"
k8sRktUIDAnno = "rkt.kubernetes.io/uid"
k8sRktNameAnno = "rkt.kubernetes.io/name"
k8sRktNamespaceAnno = "rkt.kubernetes.io/namespace"
k8sRktContainerHashAnno = "rkt.kubernetes.io/container-hash"
k8sRktRestartCountAnno = "rkt.kubernetes.io/restart-count"
k8sRktTerminationMessagePathAnno = "rkt.kubernetes.io/termination-message-path"
@@ -128,6 +127,11 @@ type Runtime struct {
volumeGetter volumeGetter
imagePuller kubecontainer.ImagePuller
runner kubecontainer.HandlerRunner
execer utilexec.Interface
os kubecontainer.OSInterface
// used for a systemd Exec, which requires the full path.
touchPath string
versions versions
}
@@ -151,6 +155,8 @@ func New(
livenessManager proberesults.Manager,
volumeGetter volumeGetter,
httpClient kubetypes.HttpGetter,
execer utilexec.Interface,
os kubecontainer.OSInterface,
imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool,
) (*Runtime, error) {
@@ -170,12 +176,17 @@ func New(
if config.Path == "" {
// No default rkt path was set, so try to find one in $PATH.
var err error
config.Path, err = exec.LookPath("rkt")
config.Path, err = execer.LookPath("rkt")
if err != nil {
return nil, fmt.Errorf("cannot find rkt binary: %v", err)
}
}
touchPath, err := execer.LookPath("touch")
if err != nil {
return nil, fmt.Errorf("cannot find touch binary: %v", err)
}
rkt := &Runtime{
systemd: systemd,
apisvcConn: apisvcConn,
@@ -187,6 +198,8 @@ func New(
recorder: recorder,
livenessManager: livenessManager,
volumeGetter: volumeGetter,
execer: execer,
touchPath: touchPath,
}
rkt.config, err = rkt.getConfig(rkt.config)
@@ -541,7 +554,6 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktUIDAnno), string(pod.UID))
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktNameAnno), pod.Name)
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktNamespaceAnno), pod.Namespace)
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktCreationTimeAnno), strconv.FormatInt(time.Now().Unix(), 10))
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktRestartCountAnno), strconv.Itoa(restartCount))
for _, c := range pod.Spec.Containers {
@@ -587,6 +599,34 @@ func makeHostNetworkMount(opts *kubecontainer.RunContainerOptions) (*kubecontain
return &hostsMount, &resolvMount
}
// podFinishedMarkerPath returns the path to a file which should be used to
// indicate the pod exiting, and the time thereof.
// If the file at the path does not exist, the pod should not be exited. If it
// does exist, then the ctime of the file should indicate the time the pod
// exited.
func podFinishedMarkerPath(podDir string, rktUID string) string {
return filepath.Join(podDir, "finished-"+rktUID)
}
func podFinishedMarkCommand(touchPath, podDir, rktUID string) string {
// TODO, if the path has a `'` character in it, this breaks.
return touchPath + " " + podFinishedMarkerPath(podDir, rktUID)
}
// podFinishedAt returns the time that a pod exited, or a zero time if it has
// not.
func (r *Runtime) podFinishedAt(podUID types.UID, rktUID string) time.Time {
markerFile := podFinishedMarkerPath(r.runtimeHelper.GetPodDir(podUID), rktUID)
stat, err := r.os.Stat(markerFile)
if err != nil {
if !os.IsNotExist(err) {
glog.Warningf("rkt: unexpected fs error checking pod finished marker: %v", err)
}
return time.Time{}
}
return stat.ModTime()
}
func makeContainerLogMount(opts *kubecontainer.RunContainerOptions, container *api.Container) (*kubecontainer.Mount, error) {
if opts.PodContainerDir == "" || container.TerminationMessagePath == "" {
return nil, nil
@@ -884,8 +924,11 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k
// TODO handle pod.Spec.HostPID
// TODO handle pod.Spec.HostIPC
// TODO per container finishedAt, not just per pod
markPodFinished := podFinishedMarkCommand(r.touchPath, r.runtimeHelper.GetPodDir(pod.UID), uuid)
units := []*unit.UnitOption{
newUnitOption("Service", "ExecStart", runPrepared),
newUnitOption("Service", "ExecStopPost", markPodFinished),
// This enables graceful stop.
newUnitOption("Service", "KillMode", "mixed"),
}
@@ -1045,14 +1088,6 @@ func (r *Runtime) convertRktPod(rktpod *rktapi.Pod) (*kubecontainer.Pod, error)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktNamespaceAnno)
}
podCreatedString, ok := manifest.Annotations.Get(k8sRktCreationTimeAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktCreationTimeAnno)
}
podCreated, err := strconv.ParseInt(podCreatedString, 10, 64)
if err != nil {
return nil, fmt.Errorf("couldn't parse pod creation timestamp: %v", err)
}
kubepod := &kubecontainer.Pod{
ID: types.UID(podUID),
@@ -1078,8 +1113,8 @@ func (r *Runtime) convertRktPod(rktpod *rktapi.Pod) (*kubecontainer.Pod, error)
// By default, the version returned by rkt API service will be "latest" if not specified.
Image: fmt.Sprintf("%s:%s", app.Image.Name, app.Image.Version),
Hash: containerHash,
Created: podCreated,
State: appStateToContainerState(app.State),
Created: time.Unix(0, rktpod.CreatedAt).Unix(), // convert ns to s
})
}
@@ -1526,7 +1561,7 @@ func appStateToContainerState(state rktapi.AppState) kubecontainer.ContainerStat
}
// getPodInfo returns the pod manifest, creation time and restart count of the pod.
func getPodInfo(pod *rktapi.Pod) (podManifest *appcschema.PodManifest, creationTime time.Time, restartCount int, err error) {
func getPodInfo(pod *rktapi.Pod) (podManifest *appcschema.PodManifest, restartCount int, err error) {
// TODO(yifan): The manifest is only used for getting the annotations.
// Consider to let the server to unmarshal the annotations.
var manifest appcschema.PodManifest
@@ -1534,16 +1569,6 @@ func getPodInfo(pod *rktapi.Pod) (podManifest *appcschema.PodManifest, creationT
return
}
creationTimeStr, ok := manifest.Annotations.Get(k8sRktCreationTimeAnno)
if !ok {
err = fmt.Errorf("no creation timestamp in pod manifest")
return
}
unixSec, err := strconv.ParseInt(creationTimeStr, 10, 64)
if err != nil {
return
}
if countString, ok := manifest.Annotations.Get(k8sRktRestartCountAnno); ok {
restartCount, err = strconv.Atoi(countString)
if err != nil {
@@ -1551,11 +1576,11 @@ func getPodInfo(pod *rktapi.Pod) (podManifest *appcschema.PodManifest, creationT
}
}
return &manifest, time.Unix(unixSec, 0), restartCount, nil
return &manifest, restartCount, nil
}
// populateContainerStatus fills the container status according to the app's information.
func populateContainerStatus(pod rktapi.Pod, app rktapi.App, runtimeApp appcschema.RuntimeApp, restartCount int, creationTime time.Time) (*kubecontainer.ContainerStatus, error) {
func populateContainerStatus(pod rktapi.Pod, app rktapi.App, runtimeApp appcschema.RuntimeApp, restartCount int, finishedTime time.Time) (*kubecontainer.ContainerStatus, error) {
hashStr, ok := runtimeApp.Annotations.Get(k8sRktContainerHashAnno)
if !ok {
return nil, fmt.Errorf("No container hash in pod manifest")
@@ -1584,14 +1609,17 @@ func populateContainerStatus(pod rktapi.Pod, app rktapi.App, runtimeApp appcsche
}
}
createdTime := time.Unix(0, pod.CreatedAt)
startedTime := time.Unix(0, pod.StartedAt)
return &kubecontainer.ContainerStatus{
ID: buildContainerID(&containerID{uuid: pod.Id, appName: app.Name}),
Name: app.Name,
State: appStateToContainerState(app.State),
// TODO(yifan): Use the creation/start/finished timestamp when it's implemented.
CreatedAt: creationTime,
StartedAt: creationTime,
ExitCode: int(app.ExitCode),
ID: buildContainerID(&containerID{uuid: pod.Id, appName: app.Name}),
Name: app.Name,
State: appStateToContainerState(app.State),
CreatedAt: createdTime,
StartedAt: startedTime,
FinishedAt: finishedTime,
ExitCode: int(app.ExitCode),
// By default, the version returned by rkt API service will be "latest" if not specified.
Image: fmt.Sprintf("%s:%s", app.Image.Name, app.Image.Version),
ImageID: "rkt://" + app.Image.Id, // TODO(yifan): Add the prefix only in api.PodStatus.
@@ -1605,6 +1633,13 @@ func populateContainerStatus(pod rktapi.Pod, app rktapi.App, runtimeApp appcsche
}, nil
}
// GetPodStatus returns the status for a pod specified by a given UID, name,
// and namespace. It will attempt to find pod's information via a request to
// the rkt api server.
// An error will be returned if the api server returns an error. If the api
// server doesn't error, but doesn't provide meaningful information about the
// pod, a status with no information (other than the passed in arguments) is
// returned anyways.
func (r *Runtime) GetPodStatus(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
podStatus := &kubecontainer.PodStatus{
ID: uid,
@@ -1626,7 +1661,7 @@ func (r *Runtime) GetPodStatus(uid types.UID, name, namespace string) (*kubecont
// In this loop, we group all containers from all pods together,
// also we try to find the latest pod, so we can fill other info of the pod below.
for _, pod := range listResp.Pods {
manifest, creationTime, restartCount, err := getPodInfo(pod)
manifest, restartCount, err := getPodInfo(pod)
if err != nil {
glog.Warningf("rkt: Couldn't get necessary info from the rkt pod, (uuid %q): %v", pod.Id, err)
continue
@@ -1637,11 +1672,10 @@ func (r *Runtime) GetPodStatus(uid types.UID, name, namespace string) (*kubecont
latestRestartCount = restartCount
}
finishedTime := r.podFinishedAt(uid, pod.Id)
for i, app := range pod.Apps {
// The order of the apps is determined by the rkt pod manifest.
// TODO(yifan): Save creationTime, restartCount in each app's annotation,
// so we don't need to pass them.
cs, err := populateContainerStatus(*pod, *app, manifest.Apps[i], restartCount, creationTime)
cs, err := populateContainerStatus(*pod, *app, manifest.Apps[i], restartCount, finishedTime)
if err != nil {
glog.Warningf("rkt: Failed to populate container status(uuid %q, app %q): %v", pod.Id, app.Name, err)
continue