Merge pull request #257 from Random-Liu/add-image-stats

Add image stats and integration test
This commit is contained in:
Lantao Liu 2017-09-25 15:35:30 -07:00 committed by GitHub
commit b9200ac403
31 changed files with 1859 additions and 42 deletions

View File

@ -61,9 +61,12 @@ jobs:
script:
- make install.deps
- make test
- make test-integration
- make test-cri
after_script:
# Abuse travis to preserve the log.
- cat /tmp/test-integration/cri-containerd.log
- cat /tmp/test-integration/containerd.log
- cat /tmp/test-cri/cri-containerd.log
- cat /tmp/test-cri/containerd.log
go: 1.8.x

View File

@ -26,7 +26,8 @@ VERSION := $(VERSION:v%=%)
TARBALL := cri-containerd-$(VERSION).tar.gz
BUILD_TAGS := apparmor
GO_LDFLAGS := -X $(PROJECT)/pkg/version.criContainerdVersion=$(VERSION)
SOURCES := $(shell find . -name '*.go')
SOURCES := $(shell find cmd/ pkg/ vendor/ -name '*.go')
INTEGRATION_SOURCES := $(shell find integration/ -name '*.go')
all: binaries
@ -35,20 +36,21 @@ default: help
help:
@echo "Usage: make <target>"
@echo
@echo " * 'install' - Install binaries to system locations"
@echo " * 'binaries' - Build cri-containerd"
@echo " * 'static-binaries - Build static cri-containerd"
@echo " * 'release' - Build release tarball"
@echo " * 'push' - Push release tarball to GCS"
@echo " * 'test' - Test cri-containerd"
@echo " * 'test-cri' - Test cri-containerd with cri validation test"
@echo " * 'test-e2e-node' - Test cri-containerd with Kubernetes node e2e test"
@echo " * 'clean' - Clean artifacts"
@echo " * 'verify' - Execute the source code verification tools"
@echo " * 'install.tools' - Install tools used by verify"
@echo " * 'install.deps' - Install dependencies of cri-containerd (containerd, runc, cni) Note: BUILDTAGS defaults to 'seccomp apparmor' for runc build"
@echo " * 'uninstall' - Remove installed binaries from system locations"
@echo " * 'version' - Print current cri-containerd release version"
@echo " * 'install' - Install binaries to system locations"
@echo " * 'binaries' - Build cri-containerd"
@echo " * 'static-binaries - Build static cri-containerd"
@echo " * 'release' - Build release tarball"
@echo " * 'push' - Push release tarball to GCS"
@echo " * 'test' - Test cri-containerd with unit test"
@echo " * 'test-integration' - Test cri-containerd with integration test"
@echo " * 'test-cri' - Test cri-containerd with cri validation test"
@echo " * 'test-e2e-node' - Test cri-containerd with Kubernetes node e2e test"
@echo " * 'clean' - Clean artifacts"
@echo " * 'verify' - Execute the source code verification tools"
@echo " * 'install.tools' - Install tools used by verify"
@echo " * 'install.deps' - Install dependencies of cri-containerd (containerd, runc, cni) Note: BUILDTAGS defaults to 'seccomp apparmor' for runc build"
@echo " * 'uninstall' - Remove installed binaries from system locations"
@echo " * 'version' - Print current cri-containerd release version"
verify: lint gofmt boiler
@ -80,6 +82,12 @@ test:
-ldflags '$(GO_LDFLAGS)' \
-gcflags '$(GO_GCFLAGS)'
$(BUILD_DIR)/integration.test: $(INTEGRATION_SOURCES)
go test -c $(PROJECT)/integration -o $(BUILD_DIR)/integration.test
test-integration: $(BUILD_DIR)/integration.test binaries
@./hack/test-integration.sh
test-cri: binaries
@./hack/test-cri.sh
@ -147,6 +155,7 @@ install.tools: .install.gitvalidation .install.gometalinter
install \
lint \
test \
test-integration \
test-cri \
test-e2e-node \
uninstall \

View File

@ -31,6 +31,8 @@ const configFilePathArgName = "config"
// ContainerdConfig contains config related to containerd
type ContainerdConfig struct {
// ContainerdRootDir is the root directory path for containerd.
ContainerdRootDir string `toml:"root"`
// ContainerdSnapshotter is the snapshotter used by containerd.
ContainerdSnapshotter string `toml:"snapshotter"`
// ContainerdEndpoint is the containerd endpoint path.
@ -66,6 +68,8 @@ type Config struct {
EnableSelinux bool `toml:"enable_selinux"`
// SandboxImage is the image used by sandbox container.
SandboxImage string `toml:"sandbox_image"`
// StatsCollectPeriod is the period (in seconds) of snapshots stats collection.
StatsCollectPeriod int `toml:"stats_collect_period"`
}
// CRIContainerdOptions contains cri-containerd command line and toml options.
@ -93,6 +97,9 @@ func (c *CRIContainerdOptions) AddFlags(fs *pflag.FlagSet) {
"/var/run/cri-containerd.sock", "Path to the socket which cri-containerd serves on.")
fs.StringVar(&c.RootDir, "root-dir",
"/var/lib/cri-containerd", "Root directory path for cri-containerd managed files (metadata checkpoint etc).")
fs.StringVar(&c.ContainerdRootDir, "containerd-root-dir",
"/var/lib/containerd", "Root directory path where containerd stores persistent data. "+
"This should be the same with containerd `root`.")
fs.StringVar(&c.ContainerdEndpoint, "containerd-endpoint",
"/run/containerd/containerd.sock", "Path to the containerd endpoint.")
fs.StringVar(&c.ContainerdSnapshotter, "containerd-snapshotter",
@ -113,6 +120,8 @@ func (c *CRIContainerdOptions) AddFlags(fs *pflag.FlagSet) {
false, "Enable selinux support.")
fs.StringVar(&c.SandboxImage, "sandbox-image",
"gcr.io/google_containers/pause:3.0", "The image used by sandbox container.")
fs.IntVar(&c.StatsCollectPeriod, "stats-collect-period",
10, "The period (in seconds) of snapshots stats collection.")
fs.BoolVar(&c.PrintDefaultConfig, "default-config",
false, "Print default toml config of cri-containerd and quit.")
}

View File

@ -22,6 +22,7 @@ source $(dirname "${BASH_SOURCE[0]}")/test-utils.sh
FOCUS=${FOCUS:-}
# SKIP skips the test to skip.
SKIP=${SKIP:-""}
# REPORT_DIR is the the directory to store test logs.
REPORT_DIR=${REPORT_DIR:-"/tmp/test-cri"}
# Check GOPATH
@ -35,7 +36,6 @@ GOPATH=${GOPATH%%:*}
CRITEST=${GOPATH}/bin/critest
CRITOOL_PKG=github.com/kubernetes-incubator/cri-tools
CRICONTAINERD_SOCK=/var/run/cri-containerd.sock
# Install critest
if [ ! -x "$(command -v ${CRITEST})" ]; then
@ -48,12 +48,12 @@ fi
which ${CRITEST}
mkdir -p ${REPORT_DIR}
start_cri_containerd ${REPORT_DIR}
test_setup ${REPORT_DIR}
# Run cri validation test
sudo env PATH=${PATH} GOPATH=${GOPATH} ${CRITEST} --runtime-endpoint=${CRICONTAINERD_SOCK} --focus="${FOCUS}" --ginkgo-flags="--skip=\"${SKIP}\"" validation
test_exit_code=$?
kill_cri_containerd
test_teardown
exit ${test_exit_code}

View File

@ -26,7 +26,9 @@ DEFAULT_SKIP+="|ImageID"
export FOCUS=${FOCUS:-""}
# SKIP skips the test to skip.
export SKIP=${SKIP:-${DEFAULT_SKIP}}
# REPORT_DIR is the the directory to store test logs.
REPORT_DIR=${REPORT_DIR:-"/tmp/test-e2e-node"}
# UPLOAD_LOG indicates whether to upload test log to gcs.
UPLOAD_LOG=${UPLOAD_LOG:-false}
# Check GOPATH
@ -67,18 +69,18 @@ git fetch --all
git checkout ${KUBERNETES_VERSION}
mkdir -p ${REPORT_DIR}
start_cri_containerd ${REPORT_DIR}
test_setup ${REPORT_DIR}
make test-e2e-node \
RUNTIME=remote \
CONTAINER_RUNTIME_ENDPOINT=unix:///var/run/cri-containerd.sock \
CONTAINER_RUNTIME_ENDPOINT=unix://${CRICONTAINERD_SOCK} \
ARTIFACTS=${REPORT_DIR} \
TEST_ARGS='--kubelet-flags=--cgroups-per-qos=true \
--kubelet-flags=--cgroup-root=/ \
--prepull-images=false'
test_exit_code=$?
kill_cri_containerd
test_teardown
sudo iptables-restore < ${ORIGINAL_RULES}
rm ${ORIGINAL_RULES}

36
hack/test-integration.sh Executable file
View File

@ -0,0 +1,36 @@
#!/bin/bash
# Copyright 2017 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -o nounset
set -o pipefail
source $(dirname "${BASH_SOURCE[0]}")/test-utils.sh
cd ${ROOT}
# FOCUS focuses the test to run.
FOCUS=${FOCUS:-""}
# REPORT_DIR is the the directory to store test logs.
REPORT_DIR=${REPORT_DIR:-"/tmp/test-integration"}
mkdir -p ${REPORT_DIR}
test_setup ${REPORT_DIR}
# Run integration test.
sudo ${ROOT}/_output/integration.test --test.run="${FOCUS}" --test.v
test_exit_code=$?
test_teardown
exit ${test_exit_code}

View File

@ -19,8 +19,10 @@ ROOT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"/..
# CRI_CONTAINERD_FLAGS are the extra flags to use when start cri-containerd.
CRI_CONTAINERD_FLAGS=${CRI_CONTAINERD_FLAGS:-""}
# start_cri_containerd starts containerd and cri-containerd.
start_cri_containerd() {
CRICONTAINERD_SOCK=/var/run/cri-containerd.sock
# test_setup starts containerd and cri-containerd.
test_setup() {
local report_dir=$1
if [ ! -x ${ROOT}/_output/cri-containerd ]; then
echo "cri-containerd is not built"
@ -32,29 +34,35 @@ start_cri_containerd() {
echo "containerd is not installed, please run hack/install-deps.sh"
exit 1
fi
kill_cri_containerd
sudo containerd -l debug &> ${report_dir}/containerd.log &
sudo pkill containerd
sudo containerd &> ${report_dir}/containerd.log &
# Wait for containerd to be running by using the containerd client ctr to check the version
# of the containerd server. Wait an increasing amount of time after each of five attempts
local MAX_ATTEMPTS=5
local attempt_num=1
until sudo ctr version &> /dev/null || (( attempt_num == MAX_ATTEMPTS ))
do
echo "attempt $attempt_num to connect to containerd failed! Trying again in $attempt_num seconds..."
sleep $(( attempt_num++ ))
done
readiness_check "sudo ctr version"
# Start cri-containerd
sudo ${ROOT}/_output/cri-containerd --alsologtostderr --v 4 ${CRI_CONTAINERD_FLAGS} \
&> ${report_dir}/cri-containerd.log &
readiness_check "sudo ${GOPATH}/bin/crictl --runtime-endpoint=${CRICONTAINERD_SOCK} info"
}
# kill_cri_containerd kills containerd and cri-containerd.
kill_cri_containerd() {
# test_teardown kills containerd and cri-containerd.
test_teardown() {
sudo pkill containerd
}
# readiness_check checks readiness of a daemon with specified command.
readiness_check() {
local command=$1
local MAX_ATTEMPTS=5
local attempt_num=1
until ${command} &> /dev/null || (( attempt_num == MAX_ATTEMPTS ))
do
echo "$attempt_num attempt \"$command\"! Trying again in $attempt_num seconds..."
sleep $(( attempt_num++ ))
done
}
# upload_logs_to_gcs uploads test logs to gcs.
# Var set:
# 1. Bucket: gcs bucket to upload logs.

View File

@ -0,0 +1,80 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package integration
import (
"fmt"
"io/ioutil"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)
func TestImageFSInfo(t *testing.T) {
t.Logf("Pull an image to make sure image fs is not empty")
img, err := imageService.PullImage(&runtime.ImageSpec{Image: "busybox"}, nil)
require.NoError(t, err)
defer func() {
err := imageService.RemoveImage(&runtime.ImageSpec{Image: img})
assert.NoError(t, err)
}()
t.Logf("Create a sandbox to make sure there is an active snapshot")
config := PodSandboxConfig("running-pod", "imagefs")
sb, err := runtimeService.RunPodSandbox(config)
require.NoError(t, err)
defer func() {
assert.NoError(t, runtimeService.StopPodSandbox(sb))
assert.NoError(t, runtimeService.RemovePodSandbox(sb))
}()
// It takes time to populate imagefs stats. Use eventually
// to check for a period of time.
t.Logf("Check imagefs info")
var info *runtime.FilesystemUsage
require.NoError(t, Eventually(func() (bool, error) {
stats, err := imageService.ImageFsInfo()
if err != nil {
return false, err
}
if len(stats) == 0 {
return false, nil
}
if len(stats) >= 2 {
return false, fmt.Errorf("unexpected stats length: %d", len(stats))
}
info = stats[0]
if info.GetTimestamp() != 0 &&
info.GetUsedBytes().GetValue() != 0 &&
info.GetInodesUsed().GetValue() != 0 &&
info.GetStorageId().GetUuid() != "" {
return true, nil
}
return false, nil
}, time.Second, 30*time.Second))
t.Logf("Device uuid should exist")
files, err := ioutil.ReadDir("/dev/disk/by-uuid")
require.NoError(t, err)
var names []string
for _, f := range files {
names = append(names, f.Name())
}
assert.Contains(t, names, info.GetStorageId().GetUuid())
}

95
integration/test_utils.go Normal file
View File

@ -0,0 +1,95 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package integration
import (
"errors"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/remote"
"github.com/kubernetes-incubator/cri-containerd/pkg/util"
)
const (
sock = "/var/run/cri-containerd.sock"
timeout = 1 * time.Minute
)
var (
runtimeService cri.RuntimeService
imageService cri.ImageManagerService
)
func init() {
var err error
runtimeService, err = remote.NewRemoteRuntimeService(sock, timeout)
if err != nil {
glog.Exitf("Failed to create runtime service: %v", err)
}
imageService, err = remote.NewRemoteImageService(sock, timeout)
if err != nil {
glog.Exitf("Failed to create image service: %v", err)
}
}
// Opts sets specific information in pod sandbox config.
type PodSandboxOpts func(*runtime.PodSandboxConfig)
// PodSandboxConfig generates a pod sandbox config for test.
func PodSandboxConfig(name, ns string, opts ...PodSandboxOpts) *runtime.PodSandboxConfig {
config := &runtime.PodSandboxConfig{
Metadata: &runtime.PodSandboxMetadata{
Name: name,
// Using random id as uuid is good enough for local
// integration test.
Uid: util.GenerateID(),
Namespace: ns,
},
Linux: &runtime.LinuxPodSandboxConfig{},
}
for _, opt := range opts {
opt(config)
}
return config
}
// CheckFunc is the function used to check a condition is true/false.
type CheckFunc func() (bool, error)
// Eventually waits for f to return true, it checks every period, and
// returns error if timeout exceeds. If f returns error, Eventually
// will return the same error immediately.
func Eventually(f CheckFunc, period, timeout time.Duration) error {
start := time.Now()
for {
done, err := f()
if done {
return nil
}
if err != nil {
return err
}
if time.Since(start) >= timeout {
return errors.New("timeout exceeded")
}
time.Sleep(period)
}
}

View File

@ -17,11 +17,13 @@ limitations under the License.
package os
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
containerdmount "github.com/containerd/containerd/mount"
"github.com/containerd/fifo"
"github.com/docker/docker/pkg/mount"
"golang.org/x/net/context"
@ -41,6 +43,8 @@ type OS interface {
Mount(source string, target string, fstype string, flags uintptr, data string) error
Unmount(target string, flags int) error
GetMounts() ([]*mount.Info, error)
LookupMount(path string) (containerdmount.Info, error)
DeviceUUID(device string) (string, error)
}
// RealOS is used to dispatch the real system level operations.
@ -120,3 +124,36 @@ func (RealOS) Unmount(target string, flags int) error {
func (RealOS) GetMounts() ([]*mount.Info, error) {
return mount.GetMounts()
}
// LookupMount gets mount info of a given path.
func (RealOS) LookupMount(path string) (containerdmount.Info, error) {
return containerdmount.Lookup(path)
}
// DeviceUUID gets device uuid of a device. The passed in device should be
// an absolute path of the device.
func (RealOS) DeviceUUID(device string) (string, error) {
const uuidDir = "/dev/disk/by-uuid"
if _, err := os.Stat(uuidDir); err != nil {
return "", err
}
files, err := ioutil.ReadDir(uuidDir)
if err != nil {
return "", err
}
for _, file := range files {
path := filepath.Join(uuidDir, file.Name())
target, err := os.Readlink(path)
if err != nil {
return "", err
}
dev, err := filepath.Abs(filepath.Join(uuidDir, target))
if err != nil {
return "", err
}
if dev == device {
return file.Name(), nil
}
}
return "", fmt.Errorf("device not found")
}

View File

@ -21,6 +21,7 @@ import (
"os"
"sync"
containerdmount "github.com/containerd/containerd/mount"
"github.com/docker/docker/pkg/mount"
"golang.org/x/net/context"
@ -50,6 +51,8 @@ type FakeOS struct {
MountFn func(source string, target string, fstype string, flags uintptr, data string) error
UnmountFn func(target string, flags int) error
GetMountsFn func() ([]*mount.Info, error)
LookupMountFn func(path string) (containerdmount.Info, error)
DeviceUUIDFn func(device string) (string, error)
calls []CalledDetail
errors map[string]error
}
@ -240,3 +243,29 @@ func (f *FakeOS) GetMounts() ([]*mount.Info, error) {
}
return nil, nil
}
// LookupMount is a fake call that invokes LookupMountFn or just return nil.
func (f *FakeOS) LookupMount(path string) (containerdmount.Info, error) {
f.appendCalls("LookupMount", path)
if err := f.getError("LookupMount"); err != nil {
return containerdmount.Info{}, err
}
if f.LookupMountFn != nil {
return f.LookupMountFn(path)
}
return containerdmount.Info{}, nil
}
// DeviceUUID is a fake call that invodes DeviceUUIDFn or just return nil.
func (f *FakeOS) DeviceUUID(device string) (string, error) {
f.appendCalls("DeviceUUID", device)
if err := f.getError("DeviceUUID"); err != nil {
return "", err
}
if f.DeviceUUIDFn != nil {
return f.DeviceUUIDFn(device)
}
return "", nil
}

View File

@ -17,7 +17,7 @@ limitations under the License.
package server
import (
"errors"
"time"
"golang.org/x/net/context"
@ -26,5 +26,26 @@ import (
// ImageFsInfo returns information of the filesystem that is used to store images.
func (c *criContainerdService) ImageFsInfo(ctx context.Context, r *runtime.ImageFsInfoRequest) (*runtime.ImageFsInfoResponse, error) {
return nil, errors.New("not implemented")
snapshots := c.snapshotStore.List()
timestamp := time.Now().UnixNano()
var usedBytes, inodesUsed uint64
for _, sn := range snapshots {
// Use the oldest timestamp as the timestamp of imagefs info.
if sn.Timestamp < timestamp {
timestamp = sn.Timestamp
}
usedBytes += sn.Size
inodesUsed += sn.Inodes
}
// TODO(random-liu): Handle content store
return &runtime.ImageFsInfoResponse{
ImageFilesystems: []*runtime.FilesystemUsage{
{
Timestamp: timestamp,
StorageId: &runtime.StorageIdentifier{Uuid: c.imageFSUUID},
UsedBytes: &runtime.UInt64Value{Value: usedBytes},
InodesUsed: &runtime.UInt64Value{Value: inodesUsed},
},
},
}, nil
}

View File

@ -0,0 +1,70 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"testing"
"github.com/containerd/containerd/snapshot"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
snapshotstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/snapshot"
)
func TestImageFsInfo(t *testing.T) {
c := newTestCRIContainerdService()
snapshots := []snapshotstore.Snapshot{
{
Key: "key1",
Kind: snapshot.KindActive,
Size: 10,
Inodes: 100,
Timestamp: 234567,
},
{
Key: "key2",
Kind: snapshot.KindCommitted,
Size: 20,
Inodes: 200,
Timestamp: 123456,
},
{
Key: "key3",
Kind: snapshot.KindView,
Size: 0,
Inodes: 0,
Timestamp: 345678,
},
}
expected := &runtime.FilesystemUsage{
Timestamp: 123456,
StorageId: &runtime.StorageIdentifier{Uuid: testImageFSUUID},
UsedBytes: &runtime.UInt64Value{Value: 30},
InodesUsed: &runtime.UInt64Value{Value: 300},
}
for _, sn := range snapshots {
c.snapshotStore.Add(sn)
}
resp, err := c.ImageFsInfo(context.Background(), &runtime.ImageFsInfoRequest{})
require.NoError(t, err)
stats := resp.GetImageFilesystems()
assert.Len(t, stats, 1)
assert.Equal(t, expected, stats[0])
}

View File

@ -280,3 +280,14 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov
}()
return in.criContainerdService.RemoveImage(ctx, r)
}
func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.ImageFsInfoRequest) (res *runtime.ImageFsInfoResponse, err error) {
glog.V(4).Infof("ImageFsInfo")
defer func() {
if err != nil {
glog.Errorf("ImageFsInfo failed, error: %v", err)
} else {
glog.V(4).Infof("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems)
}
}()
return in.criContainerdService.ImageFsInfo(ctx, r)
}

View File

@ -20,12 +20,15 @@ import (
"fmt"
"net"
"os"
"path/filepath"
"syscall"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/plugin"
"github.com/cri-o/ocicni/pkg/ocicni"
"github.com/golang/glog"
"golang.org/x/net/context"
@ -39,6 +42,7 @@ import (
containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container"
imagestore "github.com/kubernetes-incubator/cri-containerd/pkg/store/image"
sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox"
snapshotstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/snapshot"
)
const (
@ -60,6 +64,8 @@ type CRIContainerdService interface {
type criContainerdService struct {
// config contains all configurations.
config options.Config
// imageFSUUID is the device uuid of image filesystem.
imageFSUUID string
// server is the grpc server.
server *grpc.Server
// os is an interface for all required os operations.
@ -76,6 +82,8 @@ type criContainerdService struct {
containerNameIndex *registrar.Registrar
// imageStore stores all resources associated with images.
imageStore *imagestore.Store
// snapshotStore stores information of all snapshots.
snapshotStore *snapshotstore.Store
// taskService is containerd tasks client.
taskService tasks.TasksClient
// contentStoreService is the containerd content service client.
@ -113,6 +121,7 @@ func NewCRIContainerdService(config options.Config) (CRIContainerdService, error
sandboxStore: sandboxstore.NewStore(),
containerStore: containerstore.NewStore(),
imageStore: imagestore.NewStore(),
snapshotStore: snapshotstore.NewStore(),
sandboxNameIndex: registrar.NewRegistrar(),
containerNameIndex: registrar.NewRegistrar(),
taskService: client.TaskService(),
@ -121,11 +130,16 @@ func NewCRIContainerdService(config options.Config) (CRIContainerdService, error
client: client,
}
netPlugin, err := ocicni.InitCNI(config.NetworkPluginConfDir, config.NetworkPluginBinDir)
imageFSPath := imageFSPath(config.ContainerdRootDir, config.ContainerdSnapshotter)
c.imageFSUUID, err = c.getDeviceUUID(imageFSPath)
if err != nil {
return nil, fmt.Errorf("failed to get imagefs uuid: %v", err)
}
c.netPlugin, err = ocicni.InitCNI(config.NetworkPluginConfDir, config.NetworkPluginBinDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize cni plugin: %v", err)
}
c.netPlugin = netPlugin
// prepare streaming server
c.streamServer, err = newStreamServer(c, config.StreamServerAddress, config.StreamServerPort)
@ -156,6 +170,15 @@ func (c *criContainerdService) Run() error {
glog.V(2).Info("Start event monitor")
eventMonitorCloseCh := c.eventMonitor.start()
// Start snapshot stats syncer, it doesn't need to be stopped.
glog.V(2).Info("Start snapshots syncer")
snapshotsSyncer := newSnapshotsSyncer(
c.snapshotStore,
c.client.SnapshotService(c.config.ContainerdSnapshotter),
time.Duration(c.config.StatsCollectPeriod)*time.Second,
)
snapshotsSyncer.start()
// Start streaming server.
glog.V(2).Info("Start streaming server")
streamServerCloseCh := make(chan struct{})
@ -209,3 +232,18 @@ func (c *criContainerdService) Stop() {
c.streamServer.Stop() // nolint: errcheck
c.server.Stop()
}
// getDeviceUUID gets device uuid for a given path.
func (c *criContainerdService) getDeviceUUID(path string) (string, error) {
info, err := c.os.LookupMount(path)
if err != nil {
return "", err
}
return c.os.DeviceUUID(info.Source)
}
// imageFSPath returns containerd image filesystem path.
// Note that if containerd changes directory layout, we also needs to change this.
func imageFSPath(rootDir, snapshotter string) string {
return filepath.Join(rootDir, fmt.Sprintf("%s.%s", plugin.SnapshotPlugin, snapshotter))
}

View File

@ -24,6 +24,7 @@ import (
containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container"
imagestore "github.com/kubernetes-incubator/cri-containerd/pkg/store/image"
sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox"
snapshotstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/snapshot"
)
const (
@ -32,6 +33,7 @@ const (
// TODO(random-liu): Change this to image name after we have complete image
// management unit test framework.
testSandboxImage = "sha256:c75bebcdd211f41b3a460c7bf82970ed6c75acaab9cd4c9a4e125b03ca113798"
testImageFSUUID = "test-image-fs-uuid"
)
// newTestCRIContainerdService creates a fake criContainerdService for test.
@ -41,9 +43,11 @@ func newTestCRIContainerdService() *criContainerdService {
RootDir: testRootDir,
SandboxImage: testSandboxImage,
},
imageFSUUID: testImageFSUUID,
os: ostesting.NewFakeOS(),
sandboxStore: sandboxstore.NewStore(),
imageStore: imagestore.NewStore(),
snapshotStore: snapshotstore.NewStore(),
sandboxNameIndex: registrar.NewRegistrar(),
containerStore: containerstore.NewStore(),
containerNameIndex: registrar.NewRegistrar(),

110
pkg/server/snapshots.go Normal file
View File

@ -0,0 +1,110 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"context"
"time"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/snapshot"
"github.com/golang/glog"
snapshotstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/snapshot"
)
// snapshotsSyncer syncs snapshot stats periodically. imagefs info and container stats
// should both use cached result here.
// TODO(random-liu): Benchmark with high workload. We may need a statsSyncer instead if
// benchmark result shows that container cpu/memory stats also need to be cached.
type snapshotsSyncer struct {
store *snapshotstore.Store
snapshotter snapshot.Snapshotter
syncPeriod time.Duration
}
// newSnapshotsSyncer creates a snapshot syncer.
func newSnapshotsSyncer(store *snapshotstore.Store, snapshotter snapshot.Snapshotter,
period time.Duration) *snapshotsSyncer {
return &snapshotsSyncer{
store: store,
snapshotter: snapshotter,
syncPeriod: period,
}
}
// start starts the snapshots syncer. No stop function is needed because
// the syncer doesn't update any persistent states, it's fine to let it
// exit with the process.
func (s *snapshotsSyncer) start() {
tick := time.NewTicker(s.syncPeriod)
go func() {
defer tick.Stop()
// TODO(random-liu): This is expensive. We should do benchmark to
// check the resource usage and optimize this.
for {
if err := s.sync(); err != nil {
glog.Errorf("Failed to sync snapshot stats: %v", err)
}
<-tick.C
}
}()
}
// sync updates all snapshots stats.
func (s *snapshotsSyncer) sync() error {
start := time.Now().UnixNano()
collect := func(ctx context.Context, info snapshot.Info) error {
sn, err := s.store.Get(info.Name)
if err == nil {
// Only update timestamp for non-active snapshot.
if sn.Kind == info.Kind && sn.Kind != snapshot.KindActive {
sn.Timestamp = time.Now().UnixNano()
s.store.Add(sn)
return nil
}
}
// Get newest stats if the snapshot is new or active.
sn = snapshotstore.Snapshot{
Key: info.Name,
Kind: info.Kind,
Timestamp: time.Now().UnixNano(),
}
usage, err := s.snapshotter.Usage(ctx, info.Name)
if err != nil {
if errdefs.IsNotFound(err) {
return nil
}
return err
}
sn.Size = uint64(usage.Size)
sn.Inodes = uint64(usage.Inodes)
s.store.Add(sn)
return nil
}
if err := s.snapshotter.Walk(context.Background(), collect); err != nil {
return err
}
for _, sn := range s.store.List() {
if sn.Timestamp >= start {
continue
}
// Delete the snapshot stats if it's not updated this time.
s.store.Delete(sn.Key)
}
return nil
}

View File

@ -92,8 +92,8 @@ func (s *Store) List() []Image {
s.lock.RLock()
defer s.lock.RUnlock()
var images []Image
for _, sb := range s.images {
images = append(images, sb)
for _, i := range s.images {
images = append(images, i)
}
return images
}

View File

@ -99,7 +99,7 @@ func TestImageStore(t *testing.T) {
imgs = s.List()
assert.Len(imgs, 2)
t.Logf("get should return nil after deletion")
t.Logf("get should return empty struct and ErrNotExist after deletion")
img, err := s.Get(testID)
assert.Equal(Image{}, img)
assert.Equal(store.ErrNotExist, err)

View File

@ -0,0 +1,87 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package snapshot
import (
"sync"
"github.com/containerd/containerd/snapshot"
"github.com/kubernetes-incubator/cri-containerd/pkg/store"
)
// Snapshot contains the information about the snapshot.
type Snapshot struct {
// Key is the key of the snapshot
Key string
// Kind is the kind of the snapshot (active, commited, view)
Kind snapshot.Kind
// Size is the size of the snapshot in bytes.
Size uint64
// Inodes is the number of inodes used by the snapshot
Inodes uint64
// Timestamp is latest update time (in nanoseconds) of the snapshot
// information.
Timestamp int64
}
// Store stores all snapshots.
type Store struct {
lock sync.RWMutex
snapshots map[string]Snapshot
}
// NewStore creates a snapshot store.
func NewStore() *Store {
return &Store{snapshots: make(map[string]Snapshot)}
}
// Add a snapshot into the store.
func (s *Store) Add(snapshot Snapshot) {
s.lock.Lock()
defer s.lock.Unlock()
s.snapshots[snapshot.Key] = snapshot
}
// Get returns the snapshot with specified key. Returns store.ErrNotExist if the
// snapshot doesn't exist.
func (s *Store) Get(key string) (Snapshot, error) {
s.lock.RLock()
defer s.lock.RUnlock()
if sn, ok := s.snapshots[key]; ok {
return sn, nil
}
return Snapshot{}, store.ErrNotExist
}
// List lists all snapshots.
func (s *Store) List() []Snapshot {
s.lock.RLock()
defer s.lock.RUnlock()
var snapshots []Snapshot
for _, sn := range s.snapshots {
snapshots = append(snapshots, sn)
}
return snapshots
}
// Delete deletes the snapshot with specified key.
func (s *Store) Delete(key string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.snapshots, key)
}

View File

@ -0,0 +1,84 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package snapshot
import (
"testing"
"time"
"github.com/containerd/containerd/snapshot"
assertlib "github.com/stretchr/testify/assert"
"github.com/kubernetes-incubator/cri-containerd/pkg/store"
)
func TestSnapshotStore(t *testing.T) {
snapshots := map[string]Snapshot{
"key1": {
Key: "key1",
Kind: snapshot.KindActive,
Size: 10,
Inodes: 100,
Timestamp: time.Now().UnixNano(),
},
"key2": {
Key: "key2",
Kind: snapshot.KindCommitted,
Size: 20,
Inodes: 200,
Timestamp: time.Now().UnixNano(),
},
"key3": {
Key: "key3",
Kind: snapshot.KindView,
Size: 0,
Inodes: 0,
Timestamp: time.Now().UnixNano(),
},
}
assert := assertlib.New(t)
s := NewStore()
t.Logf("should be able to add snapshot")
for _, sn := range snapshots {
s.Add(sn)
}
t.Logf("should be able to get snapshot")
for id, sn := range snapshots {
got, err := s.Get(id)
assert.NoError(err)
assert.Equal(sn, got)
}
t.Logf("should be able to list snapshot")
sns := s.List()
assert.Len(sns, 3)
testKey := "key2"
t.Logf("should be able to delete snapshot")
s.Delete(testKey)
sns = s.List()
assert.Len(sns, 2)
t.Logf("get should return empty struct and ErrNotExist after deletion")
sn, err := s.Get(testKey)
assert.Equal(Snapshot{}, sn)
assert.Equal(store.ErrNotExist, err)
}

View File

@ -0,0 +1,115 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cri
import (
"time"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)
// RuntimeVersioner contains methods for runtime name, version and API version.
type RuntimeVersioner interface {
// Version returns the runtime name, runtime version and runtime API version
Version(apiVersion string) (*runtimeapi.VersionResponse, error)
}
// ContainerManager contains methods to manipulate containers managed by a
// container runtime. The methods are thread-safe.
type ContainerManager interface {
// CreateContainer creates a new container in specified PodSandbox.
CreateContainer(podSandboxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error)
// StartContainer starts the container.
StartContainer(containerID string) error
// StopContainer stops a running container with a grace period (i.e., timeout).
StopContainer(containerID string, timeout int64) error
// RemoveContainer removes the container.
RemoveContainer(containerID string) error
// ListContainers lists all containers by filters.
ListContainers(filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error)
// ContainerStatus returns the status of the container.
ContainerStatus(containerID string) (*runtimeapi.ContainerStatus, error)
// UpdateContainerResources updates the cgroup resources for the container.
UpdateContainerResources(containerID string, resources *runtimeapi.LinuxContainerResources) error
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error)
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
Exec(*runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error)
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error)
}
// PodSandboxManager contains methods for operating on PodSandboxes. The methods
// are thread-safe.
type PodSandboxManager interface {
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
RunPodSandbox(config *runtimeapi.PodSandboxConfig) (string, error)
// StopPodSandbox stops the sandbox. If there are any running containers in the
// sandbox, they should be force terminated.
StopPodSandbox(podSandboxID string) error
// RemovePodSandbox removes the sandbox. If there are running containers in the
// sandbox, they should be forcibly removed.
RemovePodSandbox(podSandboxID string) error
// PodSandboxStatus returns the Status of the PodSandbox.
PodSandboxStatus(podSandboxID string) (*runtimeapi.PodSandboxStatus, error)
// ListPodSandbox returns a list of Sandbox.
ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error)
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
PortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error)
}
// ContainerStatsManager contains methods for retriving the container
// statistics.
type ContainerStatsManager interface {
// ContainerStats returns stats of the container. If the container does not
// exist, the call returns an error.
ContainerStats(containerID string) (*runtimeapi.ContainerStats, error)
// ListContainerStats returns stats of all running containers.
ListContainerStats(filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error)
}
// RuntimeService interface should be implemented by a container runtime.
// The methods should be thread-safe.
type RuntimeService interface {
RuntimeVersioner
ContainerManager
PodSandboxManager
ContainerStatsManager
// UpdateRuntimeConfig updates runtime configuration if specified
UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeConfig) error
// Status returns the status of the runtime.
Status() (*runtimeapi.RuntimeStatus, error)
}
// ImageManagerService interface should be implemented by a container image
// manager.
// The methods should be thread-safe.
type ImageManagerService interface {
// ListImages lists the existing images.
ListImages(filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error)
// ImageStatus returns the status of the image.
ImageStatus(image *runtimeapi.ImageSpec) (*runtimeapi.Image, error)
// PullImage pulls an image with the authentication config.
PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig) (string, error)
// RemoveImage removes the image.
RemoveImage(image *runtimeapi.ImageSpec) error
// ImageFsInfo returns information of the filesystem that is used to store images.
ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error)
}

19
vendor/k8s.io/kubernetes/pkg/kubelet/remote/doc.go generated vendored Normal file
View File

@ -0,0 +1,19 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package remote containers gRPC implementation of internalapi.RuntimeService
// and internalapi.ImageManagerService.
package remote

View File

@ -0,0 +1,150 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remote
import (
"errors"
"fmt"
"time"
"github.com/golang/glog"
"google.golang.org/grpc"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/util"
)
// RemoteImageService is a gRPC implementation of internalapi.ImageManagerService.
type RemoteImageService struct {
timeout time.Duration
imageClient runtimeapi.ImageServiceClient
}
// NewRemoteImageService creates a new internalapi.ImageManagerService.
func NewRemoteImageService(endpoint string, connectionTimeout time.Duration) (internalapi.ImageManagerService, error) {
glog.V(3).Infof("Connecting to image service %s", endpoint)
addr, dailer, err := util.GetAddressAndDialer(endpoint)
if err != nil {
return nil, err
}
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(connectionTimeout), grpc.WithDialer(dailer))
if err != nil {
glog.Errorf("Connect remote image service %s failed: %v", addr, err)
return nil, err
}
return &RemoteImageService{
timeout: connectionTimeout,
imageClient: runtimeapi.NewImageServiceClient(conn),
}, nil
}
// ListImages lists available images.
func (r *RemoteImageService) ListImages(filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.imageClient.ListImages(ctx, &runtimeapi.ListImagesRequest{
Filter: filter,
})
if err != nil {
glog.Errorf("ListImages with filter %+v from image service failed: %v", filter, err)
return nil, err
}
return resp.Images, nil
}
// ImageStatus returns the status of the image.
func (r *RemoteImageService) ImageStatus(image *runtimeapi.ImageSpec) (*runtimeapi.Image, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{
Image: image,
})
if err != nil {
glog.Errorf("ImageStatus %q from image service failed: %v", image.Image, err)
return nil, err
}
if resp.Image != nil {
if resp.Image.Id == "" || resp.Image.Size_ == 0 {
errorMessage := fmt.Sprintf("Id or size of image %q is not set", image.Image)
glog.Errorf("ImageStatus failed: %s", errorMessage)
return nil, errors.New(errorMessage)
}
}
return resp.Image, nil
}
// PullImage pulls an image with authentication config.
func (r *RemoteImageService) PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig) (string, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := r.imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{
Image: image,
Auth: auth,
})
if err != nil {
glog.Errorf("PullImage %q from image service failed: %v", image.Image, err)
return "", err
}
if resp.ImageRef == "" {
errorMessage := fmt.Sprintf("imageRef of image %q is not set", image.Image)
glog.Errorf("PullImage failed: %s", errorMessage)
return "", errors.New(errorMessage)
}
return resp.ImageRef, nil
}
// RemoveImage removes the image.
func (r *RemoteImageService) RemoveImage(image *runtimeapi.ImageSpec) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
_, err := r.imageClient.RemoveImage(ctx, &runtimeapi.RemoveImageRequest{
Image: image,
})
if err != nil {
glog.Errorf("RemoveImage %q from image service failed: %v", image.Image, err)
return err
}
return nil
}
// ImageFsInfo returns information of the filesystem that is used to store images.
func (r *RemoteImageService) ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error) {
// Do not set timeout, because `ImageFsInfo` takes time.
// TODO(random-liu): Should we assume runtime should cache the result, and set timeout here?
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := r.imageClient.ImageFsInfo(ctx, &runtimeapi.ImageFsInfoRequest{})
if err != nil {
glog.Errorf("ImageFsInfo from image service failed: %v", err)
return nil, err
}
return resp.GetImageFilesystems(), nil
}

View File

@ -0,0 +1,478 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remote
import (
"errors"
"fmt"
"strings"
"time"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/util"
utilexec "k8s.io/utils/exec"
)
// RemoteRuntimeService is a gRPC implementation of internalapi.RuntimeService.
type RemoteRuntimeService struct {
timeout time.Duration
runtimeClient runtimeapi.RuntimeServiceClient
}
// NewRemoteRuntimeService creates a new internalapi.RuntimeService.
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (internalapi.RuntimeService, error) {
glog.Infof("Connecting to runtime service %s", endpoint)
addr, dailer, err := util.GetAddressAndDialer(endpoint)
if err != nil {
return nil, err
}
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(connectionTimeout), grpc.WithDialer(dailer))
if err != nil {
glog.Errorf("Connect remote runtime %s failed: %v", addr, err)
return nil, err
}
return &RemoteRuntimeService{
timeout: connectionTimeout,
runtimeClient: runtimeapi.NewRuntimeServiceClient(conn),
}, nil
}
// Version returns the runtime name, runtime version and runtime API version.
func (r *RemoteRuntimeService) Version(apiVersion string) (*runtimeapi.VersionResponse, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
typedVersion, err := r.runtimeClient.Version(ctx, &runtimeapi.VersionRequest{
Version: apiVersion,
})
if err != nil {
glog.Errorf("Version from runtime service failed: %v", err)
return nil, err
}
if typedVersion.Version == "" || typedVersion.RuntimeName == "" || typedVersion.RuntimeApiVersion == "" || typedVersion.RuntimeVersion == "" {
return nil, fmt.Errorf("not all fields are set in VersionResponse (%q)", *typedVersion)
}
return typedVersion, err
}
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (string, error) {
// Use 2 times longer timeout for sandbox operation (4 mins by default)
// TODO: Make the pod sandbox timeout configurable.
ctx, cancel := getContextWithTimeout(r.timeout * 2)
defer cancel()
resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
Config: config,
})
if err != nil {
glog.Errorf("RunPodSandbox from runtime service failed: %v", err)
return "", err
}
if resp.PodSandboxId == "" {
errorMessage := fmt.Sprintf("PodSandboxId is not set for sandbox %q", config.GetMetadata())
glog.Errorf("RunPodSandbox failed: %s", errorMessage)
return "", errors.New(errorMessage)
}
return resp.PodSandboxId, nil
}
// StopPodSandbox stops the sandbox. If there are any running containers in the
// sandbox, they should be forced to termination.
func (r *RemoteRuntimeService) StopPodSandbox(podSandBoxID string) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
_, err := r.runtimeClient.StopPodSandbox(ctx, &runtimeapi.StopPodSandboxRequest{
PodSandboxId: podSandBoxID,
})
if err != nil {
glog.Errorf("StopPodSandbox %q from runtime service failed: %v", podSandBoxID, err)
return err
}
return nil
}
// RemovePodSandbox removes the sandbox. If there are any containers in the
// sandbox, they should be forcibly removed.
func (r *RemoteRuntimeService) RemovePodSandbox(podSandBoxID string) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
_, err := r.runtimeClient.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{
PodSandboxId: podSandBoxID,
})
if err != nil {
glog.Errorf("RemovePodSandbox %q from runtime service failed: %v", podSandBoxID, err)
return err
}
return nil
}
// PodSandboxStatus returns the status of the PodSandbox.
func (r *RemoteRuntimeService) PodSandboxStatus(podSandBoxID string) (*runtimeapi.PodSandboxStatus, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.PodSandboxStatus(ctx, &runtimeapi.PodSandboxStatusRequest{
PodSandboxId: podSandBoxID,
})
if err != nil {
return nil, err
}
if resp.Status != nil {
if err := verifySandboxStatus(resp.Status); err != nil {
return nil, err
}
}
return resp.Status, nil
}
// ListPodSandbox returns a list of PodSandboxes.
func (r *RemoteRuntimeService) ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{
Filter: filter,
})
if err != nil {
glog.Errorf("ListPodSandbox with filter %+v from runtime service failed: %v", filter, err)
return nil, err
}
return resp.Items, nil
}
// CreateContainer creates a new container in the specified PodSandbox.
func (r *RemoteRuntimeService) CreateContainer(podSandBoxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.CreateContainer(ctx, &runtimeapi.CreateContainerRequest{
PodSandboxId: podSandBoxID,
Config: config,
SandboxConfig: sandboxConfig,
})
if err != nil {
glog.Errorf("CreateContainer in sandbox %q from runtime service failed: %v", podSandBoxID, err)
return "", err
}
if resp.ContainerId == "" {
errorMessage := fmt.Sprintf("ContainerId is not set for container %q", config.GetMetadata())
glog.Errorf("CreateContainer failed: %s", errorMessage)
return "", errors.New(errorMessage)
}
return resp.ContainerId, nil
}
// StartContainer starts the container.
func (r *RemoteRuntimeService) StartContainer(containerID string) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
_, err := r.runtimeClient.StartContainer(ctx, &runtimeapi.StartContainerRequest{
ContainerId: containerID,
})
if err != nil {
glog.Errorf("StartContainer %q from runtime service failed: %v", containerID, err)
return err
}
return nil
}
// StopContainer stops a running container with a grace period (i.e., timeout).
func (r *RemoteRuntimeService) StopContainer(containerID string, timeout int64) error {
// Use timeout + default timeout (2 minutes) as timeout to leave extra time
// for SIGKILL container and request latency.
t := r.timeout + time.Duration(timeout)*time.Second
ctx, cancel := getContextWithTimeout(t)
defer cancel()
_, err := r.runtimeClient.StopContainer(ctx, &runtimeapi.StopContainerRequest{
ContainerId: containerID,
Timeout: timeout,
})
if err != nil {
glog.Errorf("StopContainer %q from runtime service failed: %v", containerID, err)
return err
}
return nil
}
// RemoveContainer removes the container. If the container is running, the container
// should be forced to removal.
func (r *RemoteRuntimeService) RemoveContainer(containerID string) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
_, err := r.runtimeClient.RemoveContainer(ctx, &runtimeapi.RemoveContainerRequest{
ContainerId: containerID,
})
if err != nil {
glog.Errorf("RemoveContainer %q from runtime service failed: %v", containerID, err)
return err
}
return nil
}
// ListContainers lists containers by filters.
func (r *RemoteRuntimeService) ListContainers(filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.ListContainers(ctx, &runtimeapi.ListContainersRequest{
Filter: filter,
})
if err != nil {
glog.Errorf("ListContainers with filter %+v from runtime service failed: %v", filter, err)
return nil, err
}
return resp.Containers, nil
}
// ContainerStatus returns the container status.
func (r *RemoteRuntimeService) ContainerStatus(containerID string) (*runtimeapi.ContainerStatus, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.ContainerStatus(ctx, &runtimeapi.ContainerStatusRequest{
ContainerId: containerID,
})
if err != nil {
glog.Errorf("ContainerStatus %q from runtime service failed: %v", containerID, err)
return nil, err
}
if resp.Status != nil {
if err := verifyContainerStatus(resp.Status); err != nil {
glog.Errorf("ContainerStatus of %q failed: %v", containerID, err)
return nil, err
}
}
return resp.Status, nil
}
// UpdateContainerResources updates a containers resource config
func (r *RemoteRuntimeService) UpdateContainerResources(containerID string, resources *runtimeapi.LinuxContainerResources) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
_, err := r.runtimeClient.UpdateContainerResources(ctx, &runtimeapi.UpdateContainerResourcesRequest{
ContainerId: containerID,
Linux: resources,
})
if err != nil {
glog.Errorf("UpdateContainerResources %q from runtime service failed: %v", containerID, err)
return err
}
return nil
}
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
func (r *RemoteRuntimeService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
// Do not set timeout when timeout is 0.
var ctx context.Context
var cancel context.CancelFunc
if timeout != 0 {
// Use timeout + default timeout (2 minutes) as timeout to leave some time for
// the runtime to do cleanup.
ctx, cancel = getContextWithTimeout(r.timeout + timeout)
} else {
ctx, cancel = getContextWithCancel()
}
defer cancel()
timeoutSeconds := int64(timeout.Seconds())
req := &runtimeapi.ExecSyncRequest{
ContainerId: containerID,
Cmd: cmd,
Timeout: timeoutSeconds,
}
resp, err := r.runtimeClient.ExecSync(ctx, req)
if err != nil {
glog.Errorf("ExecSync %s '%s' from runtime service failed: %v", containerID, strings.Join(cmd, " "), err)
return nil, nil, err
}
err = nil
if resp.ExitCode != 0 {
err = utilexec.CodeExitError{
Err: fmt.Errorf("command '%s' exited with %d: %s", strings.Join(cmd, " "), resp.ExitCode, resp.Stderr),
Code: int(resp.ExitCode),
}
}
return resp.Stdout, resp.Stderr, err
}
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (r *RemoteRuntimeService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.Exec(ctx, req)
if err != nil {
glog.Errorf("Exec %s '%s' from runtime service failed: %v", req.ContainerId, strings.Join(req.Cmd, " "), err)
return nil, err
}
if resp.Url == "" {
errorMessage := "URL is not set"
glog.Errorf("Exec failed: %s", errorMessage)
return nil, errors.New(errorMessage)
}
return resp, nil
}
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
func (r *RemoteRuntimeService) Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.Attach(ctx, req)
if err != nil {
glog.Errorf("Attach %s from runtime service failed: %v", req.ContainerId, err)
return nil, err
}
if resp.Url == "" {
errorMessage := "URL is not set"
glog.Errorf("Exec failed: %s", errorMessage)
return nil, errors.New(errorMessage)
}
return resp, nil
}
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
func (r *RemoteRuntimeService) PortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.PortForward(ctx, req)
if err != nil {
glog.Errorf("PortForward %s from runtime service failed: %v", req.PodSandboxId, err)
return nil, err
}
if resp.Url == "" {
errorMessage := "URL is not set"
glog.Errorf("Exec failed: %s", errorMessage)
return nil, errors.New(errorMessage)
}
return resp, nil
}
// UpdateRuntimeConfig updates the config of a runtime service. The only
// update payload currently supported is the pod CIDR assigned to a node,
// and the runtime service just proxies it down to the network plugin.
func (r *RemoteRuntimeService) UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeConfig) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
// Response doesn't contain anything of interest. This translates to an
// Event notification to the network plugin, which can't fail, so we're
// really looking to surface destination unreachable.
_, err := r.runtimeClient.UpdateRuntimeConfig(ctx, &runtimeapi.UpdateRuntimeConfigRequest{
RuntimeConfig: runtimeConfig,
})
if err != nil {
return err
}
return nil
}
// Status returns the status of the runtime.
func (r *RemoteRuntimeService) Status() (*runtimeapi.RuntimeStatus, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.Status(ctx, &runtimeapi.StatusRequest{})
if err != nil {
glog.Errorf("Status from runtime service failed: %v", err)
return nil, err
}
if resp.Status == nil || len(resp.Status.Conditions) < 2 {
errorMessage := "RuntimeReady or NetworkReady condition are not set"
glog.Errorf("Status failed: %s", errorMessage)
return nil, errors.New(errorMessage)
}
return resp.Status, nil
}
// ContainerStats returns the stats of the container.
func (r *RemoteRuntimeService) ContainerStats(containerID string) (*runtimeapi.ContainerStats, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.ContainerStats(ctx, &runtimeapi.ContainerStatsRequest{
ContainerId: containerID,
})
if err != nil {
glog.Errorf("ContainerStatus %q from runtime service failed: %v", containerID, err)
return nil, err
}
return resp.GetStats(), nil
}
func (r *RemoteRuntimeService) ListContainerStats(filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) {
// Do not set timeout, because writable layer stats collection takes time.
// TODO(random-liu): Should we assume runtime should cache the result, and set timeout here?
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := r.runtimeClient.ListContainerStats(ctx, &runtimeapi.ListContainerStatsRequest{
Filter: filter,
})
if err != nil {
glog.Errorf("ListContainerStats with filter %+v from runtime service failed: %v", filter, err)
return nil, err
}
return resp.GetStats(), nil
}

88
vendor/k8s.io/kubernetes/pkg/kubelet/remote/utils.go generated vendored Normal file
View File

@ -0,0 +1,88 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remote
import (
"fmt"
"time"
"golang.org/x/net/context"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)
// getContextWithTimeout returns a context with timeout.
func getContextWithTimeout(timeout time.Duration) (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), timeout)
}
// getContextWithCancel returns a context with cancel.
func getContextWithCancel() (context.Context, context.CancelFunc) {
return context.WithCancel(context.Background())
}
// verifySandboxStatus verified whether all required fields are set in PodSandboxStatus.
func verifySandboxStatus(status *runtimeapi.PodSandboxStatus) error {
if status.Id == "" {
return fmt.Errorf("Id is not set")
}
if status.Metadata == nil {
return fmt.Errorf("Metadata is not set")
}
metadata := status.Metadata
if metadata.Name == "" || metadata.Namespace == "" || metadata.Uid == "" {
return fmt.Errorf("Name, Namespace or Uid is not in metadata %q", metadata)
}
if status.CreatedAt == 0 {
return fmt.Errorf("CreatedAt is not set")
}
return nil
}
// verifyContainerStatus verified whether all required fields are set in ContainerStatus.
func verifyContainerStatus(status *runtimeapi.ContainerStatus) error {
if status.Id == "" {
return fmt.Errorf("Id is not set")
}
if status.Metadata == nil {
return fmt.Errorf("Metadata is not set")
}
metadata := status.Metadata
if metadata.Name == "" {
return fmt.Errorf("Name is not in metadata %q", metadata)
}
if status.CreatedAt == 0 {
return fmt.Errorf("CreatedAt is not set")
}
if status.Image == nil || status.Image.Image == "" {
return fmt.Errorf("Image is not set")
}
if status.ImageRef == "" {
return fmt.Errorf("ImageRef is not set")
}
return nil
}

18
vendor/k8s.io/kubernetes/pkg/kubelet/util/doc.go generated vendored Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Utility functions.
package util // import "k8s.io/kubernetes/pkg/kubelet/util"

47
vendor/k8s.io/kubernetes/pkg/kubelet/util/util.go generated vendored Normal file
View File

@ -0,0 +1,47 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"net/url"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// FromApiserverCache modifies <opts> so that the GET request will
// be served from apiserver cache instead of from etcd.
func FromApiserverCache(opts *metav1.GetOptions) {
opts.ResourceVersion = "0"
}
func parseEndpoint(endpoint string) (string, string, error) {
u, err := url.Parse(endpoint)
if err != nil {
return "", "", err
}
if u.Scheme == "tcp" {
return "tcp", u.Host, nil
} else if u.Scheme == "unix" {
return "unix", u.Path, nil
} else if u.Scheme == "" {
return "", "", fmt.Errorf("Using %q as endpoint is deprecated, please consider using full url format", endpoint)
} else {
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
}
}

79
vendor/k8s.io/kubernetes/pkg/kubelet/util/util_unix.go generated vendored Normal file
View File

@ -0,0 +1,79 @@
// +build freebsd linux
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"net"
"os"
"time"
"github.com/golang/glog"
"golang.org/x/sys/unix"
)
const (
// unixProtocol is the network protocol of unix socket.
unixProtocol = "unix"
)
func CreateListener(endpoint string) (net.Listener, error) {
protocol, addr, err := parseEndpointWithFallbackProtocol(endpoint, unixProtocol)
if err != nil {
return nil, err
}
if protocol != unixProtocol {
return nil, fmt.Errorf("only support unix socket endpoint")
}
// Unlink to cleanup the previous socket file.
err = unix.Unlink(addr)
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to unlink socket file %q: %v", addr, err)
}
return net.Listen(protocol, addr)
}
func GetAddressAndDialer(endpoint string) (string, func(addr string, timeout time.Duration) (net.Conn, error), error) {
protocol, addr, err := parseEndpointWithFallbackProtocol(endpoint, unixProtocol)
if err != nil {
return "", nil, err
}
if protocol != unixProtocol {
return "", nil, fmt.Errorf("only support unix socket endpoint")
}
return addr, dial, nil
}
func dial(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout(unixProtocol, addr, timeout)
}
func parseEndpointWithFallbackProtocol(endpoint string, fallbackProtocol string) (protocol string, addr string, err error) {
if protocol, addr, err = parseEndpoint(endpoint); err != nil && protocol == "" {
fallbackEndpoint := fallbackProtocol + "://" + endpoint
protocol, addr, err = parseEndpoint(fallbackEndpoint)
if err == nil {
glog.Warningf("Using %q as endpoint is deprecated, please consider using full url format %q.", endpoint, fallbackEndpoint)
}
}
return
}

View File

@ -0,0 +1,33 @@
// +build !freebsd,!linux,!windows
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"net"
"time"
)
func CreateListener(endpoint string) (net.Listener, error) {
return nil, fmt.Errorf("CreateListener is unsupported in this build")
}
func GetAddressAndDialer(endpoint string) (string, func(addr string, timeout time.Duration) (net.Conn, error), error) {
return "", nil, fmt.Errorf("GetAddressAndDialer is unsupported in this build")
}

View File

@ -0,0 +1,57 @@
// +build windows
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"net"
"time"
)
const (
tcpProtocol = "tcp"
)
func CreateListener(endpoint string) (net.Listener, error) {
protocol, addr, err := parseEndpoint(endpoint)
if err != nil {
return nil, err
}
if protocol != tcpProtocol {
return nil, fmt.Errorf("only support tcp endpoint")
}
return net.Listen(protocol, addr)
}
func GetAddressAndDialer(endpoint string) (string, func(addr string, timeout time.Duration) (net.Conn, error), error) {
protocol, addr, err := parseEndpoint(endpoint)
if err != nil {
return "", nil, err
}
if protocol != tcpProtocol {
return "", nil, fmt.Errorf("only support tcp endpoint")
}
return addr, dial, nil
}
func dial(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout(tcpProtocol, addr, timeout)
}