Adding endpoint for log retrieval on the minion
This commit is contained in:
		| @@ -25,6 +25,7 @@ import ( | ||||
| 	"sort" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"io" | ||||
|  | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||
| 	"github.com/fsouza/go-dockerclient" | ||||
| @@ -46,6 +47,7 @@ type DockerInterface interface { | ||||
| 	StartContainer(id string, hostConfig *docker.HostConfig) error | ||||
| 	StopContainer(id string, timeout uint) error | ||||
| 	PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error | ||||
| 	Logs(opts docker.LogsOptions) error | ||||
| } | ||||
|  | ||||
| // DockerID is an ID of docker container. It is a type to make it clear when we're working with docker container Ids | ||||
| @@ -202,6 +204,26 @@ func GetRecentDockerContainersWithNameAndUUID(client DockerInterface, podFullNam | ||||
| 	return result, nil | ||||
| } | ||||
|  | ||||
| // GetKubeletDockerContainerLogs returns logs of specific container | ||||
| func GetKubeletDockerContainerLogs(client DockerInterface, containerID, tail string, follow bool, writer io.Writer) (err error) { | ||||
| 	opts := docker.LogsOptions{ | ||||
| 		Container:    containerID, | ||||
| 		Stdout:       true, | ||||
| 		Stderr:       true, | ||||
| 		OutputStream: writer, | ||||
| 		ErrorStream:  writer, | ||||
| 		Timestamps:   true, | ||||
| 		RawTerminal:  true, | ||||
| 	} | ||||
|  | ||||
| 	if opts.Follow = follow; follow == false { | ||||
| 		opts.Tail = tail | ||||
| 	} | ||||
|  | ||||
| 	err = client.Logs(opts) | ||||
| 	return | ||||
| } | ||||
|  | ||||
| // ErrNoContainersInPod is returned when there are no containers for a given pod | ||||
| var ErrNoContainersInPod = errors.New("no containers exist for this pod") | ||||
|  | ||||
|   | ||||
| @@ -111,6 +111,15 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error { | ||||
| 	return f.Err | ||||
| } | ||||
|  | ||||
| // Logs is a test-spy implementation of DockerInterface.Logs. | ||||
| // It adds an entry "logs" to the internal method call record. | ||||
| func (f *FakeDockerClient) Logs(opts docker.LogsOptions) error { | ||||
| 	f.Lock() | ||||
| 	defer f.Unlock() | ||||
| 	f.called = append(f.called, "logs") | ||||
| 	return f.Err | ||||
| } | ||||
|  | ||||
| // PullImage is a test-spy implementation of DockerInterface.StopContainer. | ||||
| // It adds an entry "pull" to the internal method call record. | ||||
| func (f *FakeDockerClient) PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error { | ||||
|   | ||||
| @@ -20,6 +20,8 @@ import ( | ||||
| 	"fmt" | ||||
| 	"net" | ||||
| 	"strconv" | ||||
| 	"net/http" | ||||
| 	"io" | ||||
|  | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/util" | ||||
| @@ -95,3 +97,21 @@ func (h *httpActionHandler) Run(podFullName, uuid string, container *api.Contain | ||||
| 	_, err := h.client.Get(url) | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| // flusherWriter provides wrapper for responseWriter with HTTP streaming capabilities | ||||
| type FlushWriter struct { | ||||
| 	flusher http.Flusher | ||||
| 	writer io.Writer | ||||
| } | ||||
|  | ||||
| // Write is a flushWriter implementation of the io.Writer that sends any buffered data to the client. | ||||
| func (fw *FlushWriter) Write(p []byte) (n int, err error) { | ||||
| 	n, err = fw.writer.Write(p) | ||||
| 	if err != nil { | ||||
| 		return n, err | ||||
| 	} | ||||
| 	if fw.flusher != nil { | ||||
| 		fw.flusher.Flush() | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
|   | ||||
| @@ -26,6 +26,7 @@ import ( | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 	"io" | ||||
|  | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" | ||||
| @@ -751,6 +752,11 @@ func (kl *Kubelet) statsFromContainerPath(containerPath string, req *info.Contai | ||||
| 	return cinfo, nil | ||||
| } | ||||
|  | ||||
| // GetKubeletContainerLogs returns logs from the container | ||||
| func (kl *Kubelet) GetKubeletContainerLogs(containerID, tail string, follow bool, writer io.Writer) error { | ||||
| 	return dockertools.GetKubeletDockerContainerLogs(kl.dockerClient, containerID, tail , follow, writer) | ||||
| } | ||||
|  | ||||
| // GetPodInfo returns information from Docker about the containers in a pod | ||||
| func (kl *Kubelet) GetPodInfo(podFullName, uuid string) (api.PodInfo, error) { | ||||
| 	return dockertools.GetDockerPodInfo(kl.dockerClient, podFullName, uuid) | ||||
|   | ||||
| @@ -68,6 +68,7 @@ type HostInterface interface { | ||||
| 	GetMachineInfo() (*info.MachineInfo, error) | ||||
| 	GetPodInfo(name, uuid string) (api.PodInfo, error) | ||||
| 	RunInContainer(name, uuid, container string, cmd []string) ([]byte, error) | ||||
| 	GetKubeletContainerLogs(containerID, tail string, follow bool, writer io.Writer) error | ||||
| 	ServeLogs(w http.ResponseWriter, req *http.Request) | ||||
| } | ||||
|  | ||||
| @@ -92,6 +93,7 @@ func (s *Server) InstallDefaultHandlers() { | ||||
| 	s.mux.HandleFunc("/logs/", s.handleLogs) | ||||
| 	s.mux.HandleFunc("/spec/", s.handleSpec) | ||||
| 	s.mux.HandleFunc("/run/", s.handleRun) | ||||
| 	s.mux.HandleFunc("/containerLogs", s.handleContainerLogs) | ||||
| } | ||||
|  | ||||
| // error serializes an error object into an HTTP response. | ||||
| @@ -143,7 +145,45 @@ func (s *Server) handleContainers(w http.ResponseWriter, req *http.Request) { | ||||
|  | ||||
| } | ||||
|  | ||||
| // handlePodInfo handles podInfo requests against the Kubelet. | ||||
| // handleContainerLogs handles containerLogs request againts the Kubelet | ||||
| func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) { | ||||
| 	defer req.Body.Close() | ||||
| 	u, err := url.ParseRequestURI(req.RequestURI) | ||||
| 	if err != nil { | ||||
| 		s.error(w, err) | ||||
| 		return | ||||
| 	} | ||||
| 	uriValues := u.Query() | ||||
|  | ||||
| 	containerID := uriValues.Get("containerID") | ||||
| 	follow := uriValues.Get("follow") == "1" | ||||
| 	tail := uriValues.Get("tail") | ||||
|  | ||||
| 	if len(containerID) == 0 { | ||||
| 		w.WriteHeader(http.StatusBadRequest) | ||||
| 		http.Error(w, "Missing 'containerID=' query entry.", http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
| 	logWriter := httplog.LogOf(req, w) | ||||
| 	w = httplog.Unlogged(w) | ||||
| 	fw := FlushWriter{writer: w} | ||||
| 	if flusher, ok := w.(http.Flusher); ok { | ||||
| 		fw.flusher = flusher | ||||
| 	} else { | ||||
| 		logWriter.Addf("unable to get Flusher") | ||||
| 		http.NotFound(w, req) | ||||
| 		return | ||||
| 	} | ||||
| 	w.Header().Set("Transfer-Encoding", "chunked") | ||||
| 	w.WriteHeader(http.StatusOK) | ||||
| 	err = s.host.GetKubeletContainerLogs(containerID, tail, follow, &fw) | ||||
| 	if err != nil { | ||||
| 		s.error(w, err) | ||||
| 		return | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // handlePodInfo handles podInfo requests against the Kubelet | ||||
| func (s *Server) handlePodInfo(w http.ResponseWriter, req *http.Request) { | ||||
| 	u, err := url.ParseRequestURI(req.RequestURI) | ||||
| 	if err != nil { | ||||
|   | ||||
| @@ -27,6 +27,7 @@ import ( | ||||
| 	"reflect" | ||||
| 	"strings" | ||||
| 	"testing" | ||||
| 	"io" | ||||
|  | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/util" | ||||
| @@ -35,12 +36,17 @@ import ( | ||||
| ) | ||||
|  | ||||
| type fakeKubelet struct { | ||||
| 	infoFunc          func(name string) (api.PodInfo, error) | ||||
| 	containerInfoFunc func(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) | ||||
| 	rootInfoFunc      func(query *info.ContainerInfoRequest) (*info.ContainerInfo, error) | ||||
| 	machineInfoFunc   func() (*info.MachineInfo, error) | ||||
| 	logFunc           func(w http.ResponseWriter, req *http.Request) | ||||
| 	runFunc           func(podFullName, uuid, containerName string, cmd []string) ([]byte, error) | ||||
| 	infoFunc           func(name string) (api.PodInfo, error) | ||||
| 	containerInfoFunc  func(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) | ||||
| 	rootInfoFunc       func(query *info.ContainerInfoRequest) (*info.ContainerInfo, error) | ||||
| 	machineInfoFunc    func() (*info.MachineInfo, error) | ||||
| 	logFunc            func(w http.ResponseWriter, req *http.Request) | ||||
| 	runFunc            func(podFullName, uuid, containerName string, cmd []string) ([]byte, error) | ||||
| 	containerLogsFunc  func(containerID, tail string, follow bool, writer io.Writer)  error | ||||
| } | ||||
|  | ||||
| func (fk *fakeKubelet) GetKubeletContainerLogs(containerID, tail string, follow bool, writer io.Writer) error { | ||||
| 	return fk.containerLogsFunc(containerID, tail, follow, writer) | ||||
| } | ||||
|  | ||||
| func (fk *fakeKubelet) GetPodInfo(name, uuid string) (api.PodInfo, error) { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 jhadvig
					jhadvig