Merge pull request #7589 from yifan-gu/rkt_runpod
kubelet/rkt: Add RunPod() for rkt.
This commit is contained in:
@@ -19,17 +19,27 @@ package rkt
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"hash/adler32"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
|
||||
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
|
||||
appcschema "github.com/appc/spec/schema"
|
||||
appctypes "github.com/appc/spec/schema/types"
|
||||
"github.com/coreos/go-systemd/dbus"
|
||||
"github.com/coreos/go-systemd/unit"
|
||||
"github.com/coreos/rkt/store"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
@@ -382,3 +392,146 @@ func (r *Runtime) getImageID(imageName string) (string, error) {
|
||||
}
|
||||
return last, nil
|
||||
}
|
||||
|
||||
func newUnitOption(section, name, value string) *unit.UnitOption {
|
||||
return &unit.UnitOption{Section: section, Name: name, Value: value}
|
||||
}
|
||||
|
||||
// TODO(yifan): Move this duplicated function to container runtime.
|
||||
// hashContainer computes the hash of one api.Container.
|
||||
func hashContainer(container *api.Container) uint64 {
|
||||
hash := adler32.New()
|
||||
util.DeepHashObject(hash, *container)
|
||||
return uint64(hash.Sum32())
|
||||
}
|
||||
|
||||
// TODO(yifan): Remove the receiver once we can solve the appName->imageID problem.
|
||||
func (r *Runtime) apiPodToRuntimePod(uuid string, pod *api.Pod) *kubecontainer.Pod {
|
||||
p := &kubecontainer.Pod{
|
||||
ID: pod.UID,
|
||||
Name: pod.Name,
|
||||
Namespace: pod.Namespace,
|
||||
}
|
||||
for i := range pod.Spec.Containers {
|
||||
c := &pod.Spec.Containers[i]
|
||||
imageID, err := r.getImageID(c.Image)
|
||||
if err != nil {
|
||||
glog.Warningf("rkt: Cannot get image id: %v", err)
|
||||
}
|
||||
p.Containers = append(p.Containers, &kubecontainer.Container{
|
||||
ID: types.UID(buildContainerID(&containerID{uuid, c.Name, imageID})),
|
||||
Name: c.Name,
|
||||
Image: c.Image,
|
||||
Hash: hashContainer(c),
|
||||
Created: time.Now().Unix(),
|
||||
})
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
// preparePod will:
|
||||
//
|
||||
// 1. Invoke 'rkt prepare' to prepare the pod, and get the rkt pod uuid.
|
||||
// 2. Creates the unit file and save it under systemdUnitDir.
|
||||
//
|
||||
// On success, it will return a string that represents name of the unit file
|
||||
// and a boolean that indicates if the unit file needs to be reloaded (whether
|
||||
// the file is already existed).
|
||||
func (r *Runtime) preparePod(pod *api.Pod, volumeMap map[string]volume.Volume) (string, bool, error) {
|
||||
cmds := []string{"prepare", "--quiet", "--pod-manifest"}
|
||||
|
||||
// Generate the pod manifest from the pod spec.
|
||||
manifest, err := r.makePodManifest(pod, volumeMap)
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
manifestFile, err := ioutil.TempFile("", "manifest")
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
defer func() {
|
||||
manifestFile.Close()
|
||||
if err := os.Remove(manifestFile.Name()); err != nil {
|
||||
glog.Warningf("rkt: Cannot remove temp manifest file %q: %v", manifestFile.Name(), err)
|
||||
}
|
||||
}()
|
||||
|
||||
data, err := json.Marshal(manifest)
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
// Since File.Write returns error if the written length is less than len(data),
|
||||
// so check error is enough for us.
|
||||
if _, err := manifestFile.Write(data); err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
|
||||
cmds = append(cmds, manifestFile.Name())
|
||||
output, err := r.runCommand(cmds...)
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
if len(output) != 1 {
|
||||
return "", false, fmt.Errorf("cannot get uuid from 'rkt prepare'")
|
||||
}
|
||||
uuid := output[0]
|
||||
glog.V(4).Infof("'rkt prepare' returns %q.", uuid)
|
||||
|
||||
p := r.apiPodToRuntimePod(uuid, pod)
|
||||
b, err := json.Marshal(p)
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
|
||||
runPrepared := fmt.Sprintf("%s run-prepared --private-net=%v %s", r.absPath, pod.Spec.HostNetwork, uuid)
|
||||
units := []*unit.UnitOption{
|
||||
newUnitOption(unitKubernetesSection, unitRktID, uuid),
|
||||
newUnitOption(unitKubernetesSection, unitPodName, string(b)),
|
||||
newUnitOption("Service", "ExecStart", runPrepared),
|
||||
}
|
||||
|
||||
// Save the unit file under systemd's service directory.
|
||||
// TODO(yifan) Garbage collect 'dead' service files.
|
||||
needReload := false
|
||||
unitName := makePodServiceFileName(pod.UID)
|
||||
if _, err := os.Stat(path.Join(systemdServiceDir, unitName)); err == nil {
|
||||
needReload = true
|
||||
}
|
||||
unitFile, err := os.Create(path.Join(systemdServiceDir, unitName))
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
defer unitFile.Close()
|
||||
|
||||
_, err = io.Copy(unitFile, unit.Serialize(units))
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
return unitName, needReload, nil
|
||||
}
|
||||
|
||||
// RunPod first creates the unit file for a pod, and then calls
|
||||
// StartUnit over d-bus.
|
||||
func (r *Runtime) RunPod(pod *api.Pod, volumeMap map[string]volume.Volume) error {
|
||||
glog.V(4).Infof("Rkt starts to run pod: name %q.", pod.Name)
|
||||
|
||||
name, needReload, err := r.preparePod(pod, volumeMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if needReload {
|
||||
// TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout.
|
||||
r.systemd.KillUnit(name, int32(syscall.SIGKILL))
|
||||
if err := r.systemd.Reload(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(yifan): This is the old version of go-systemd. Should update when libcontainer updates
|
||||
// its version of go-systemd.
|
||||
_, err = r.systemd.StartUnit(name, "replace")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user