
Signed-off-by: lengrongfu <lenronfu@gmail.com> Signed-off-by: rongfu.leng <lenronfu@gmail.com>
1268 lines
31 KiB
Go
1268 lines
31 KiB
Go
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package integration
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
goruntime "runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/containerd/nri/pkg/api"
|
|
"github.com/containerd/nri/pkg/stub"
|
|
"github.com/opencontainers/selinux/go-selinux"
|
|
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
|
|
|
cri "github.com/containerd/containerd/v2/integration/cri-api/pkg/apis"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/containerd/containerd/v2/integration/images"
|
|
)
|
|
|
|
const (
|
|
nriTestSocket = "/var/run/nri-test.sock"
|
|
pluginSyncTimeout = 3 * time.Second
|
|
containerStopTimeout = 10
|
|
onlineCpus = "/sys/devices/system/cpu/online"
|
|
normalMems = "/sys/devices/system/node/has_normal_memory"
|
|
)
|
|
|
|
var (
|
|
availableCpuset []string
|
|
availableMemset []string
|
|
)
|
|
|
|
// skipNriTestIfNecessary skips NRI tests if necessary.
|
|
func skipNriTestIfNecessary(t *testing.T, extraSkipChecks ...map[string]bool) {
|
|
if goruntime.GOOS != "linux" {
|
|
t.Skip("Not running on linux")
|
|
}
|
|
|
|
if selinux.GetEnabled() {
|
|
// https://github.com/containerd/containerd/pull/7892#issuecomment-1369825603
|
|
t.Skip("SELinux relabeling is not supported for NRI yet")
|
|
}
|
|
_, err := os.Stat(nriTestSocket)
|
|
if err != nil {
|
|
t.Skip("Containerd test instance does not have NRI enabled")
|
|
}
|
|
|
|
for _, checks := range extraSkipChecks {
|
|
for check, skip := range checks {
|
|
if skip {
|
|
t.Skip(check)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Test successful Nri plugin setup.
|
|
func TestNriPluginSetup(t *testing.T) {
|
|
skipNriTestIfNecessary(t)
|
|
|
|
t.Log("Test that NRI plugins can connect and get set up.")
|
|
|
|
var (
|
|
tc = &nriTest{
|
|
t: t,
|
|
plugins: []*mockPlugin{
|
|
{},
|
|
{},
|
|
{},
|
|
},
|
|
}
|
|
)
|
|
|
|
tc.setup()
|
|
}
|
|
|
|
// Test NRI runtime/plugin state synchronization.
|
|
func TestNriPluginSynchronization(t *testing.T) {
|
|
skipNriTestIfNecessary(t)
|
|
|
|
t.Log("Test that NRI plugins get properly synchronized with the runtime state.")
|
|
|
|
var (
|
|
tc = &nriTest{
|
|
t: t,
|
|
}
|
|
podCount = 3
|
|
ctrPerPod = 2
|
|
)
|
|
|
|
tc.setup()
|
|
|
|
for i := 0; i < podCount; i++ {
|
|
podID := tc.runPod(fmt.Sprintf("pod%d", i))
|
|
for j := 0; j < ctrPerPod; j++ {
|
|
tc.startContainer(podID, fmt.Sprintf("ctr%d", j))
|
|
}
|
|
}
|
|
|
|
for _, plugin := range []*mockPlugin{{}, {}, {}} {
|
|
tc.connectNewPlugin(plugin)
|
|
}
|
|
|
|
for _, plugin := range tc.plugins {
|
|
err := plugin.Wait(PluginSynchronized, time.After(pluginSyncTimeout))
|
|
require.NoError(t, err, "plugin sync wait")
|
|
}
|
|
|
|
for _, id := range tc.pods {
|
|
for _, plugin := range tc.plugins {
|
|
_, ok := plugin.pods[id]
|
|
require.True(tc.t, ok, "runtime sync of pod "+id)
|
|
}
|
|
}
|
|
|
|
for _, id := range tc.ctrs {
|
|
for _, plugin := range tc.plugins {
|
|
_, ok := plugin.ctrs[id]
|
|
require.True(t, ok, "runtime sync of container "+id)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Test mount injection into containers by NRI plugins.
|
|
func TestNriMountInjection(t *testing.T) {
|
|
skipNriTestIfNecessary(t)
|
|
|
|
t.Log("Test that NRI plugins can inject mounts into containers.")
|
|
|
|
var (
|
|
out = t.TempDir()
|
|
tc = &nriTest{
|
|
t: t,
|
|
plugins: []*mockPlugin{{}},
|
|
}
|
|
injectMount = func(p *mockPlugin, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) {
|
|
adjust := &api.ContainerAdjustment{}
|
|
adjust.AddMount(&api.Mount{
|
|
Destination: "/out",
|
|
Source: out,
|
|
Type: "bind",
|
|
Options: []string{"bind"},
|
|
})
|
|
return adjust, nil, nil
|
|
}
|
|
)
|
|
|
|
tc.plugins[0].createContainer = injectMount
|
|
tc.setup()
|
|
|
|
msg := fmt.Sprintf("hello process %d", os.Getpid())
|
|
cmd := "echo " + msg + " > /out/result; sleep 3600"
|
|
|
|
podID := tc.runPod("pod0")
|
|
tc.startContainer(podID, "ctr0",
|
|
WithCommand("/bin/sh", "-c", cmd),
|
|
)
|
|
|
|
chk, err := waitForFileAndRead(filepath.Join(out, "result"), time.Second)
|
|
require.NoError(t, err, "read result")
|
|
require.Equal(t, msg+"\n", string(chk), "check result")
|
|
}
|
|
|
|
// Test environment variable injection by NRI plugins.
|
|
func TestNriEnvironmentInjection(t *testing.T) {
|
|
skipNriTestIfNecessary(t)
|
|
|
|
t.Log("Test that NRI plugins can inject environment variables into containers.")
|
|
|
|
var (
|
|
out = t.TempDir()
|
|
tc = &nriTest{
|
|
t: t,
|
|
plugins: []*mockPlugin{{}},
|
|
}
|
|
injectEnv = func(p *mockPlugin, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) {
|
|
adjust := &api.ContainerAdjustment{}
|
|
adjust.AddMount(&api.Mount{
|
|
Destination: "/out",
|
|
Source: out,
|
|
Type: "bind",
|
|
Options: []string{"bind"},
|
|
})
|
|
adjust.AddEnv("TEST_ENV_NAME", "TEST_ENV_VALUE")
|
|
return adjust, nil, nil
|
|
}
|
|
)
|
|
|
|
tc.plugins[0].createContainer = injectEnv
|
|
tc.setup()
|
|
|
|
podID := tc.runPod("pod0")
|
|
tc.startContainer(podID, "ctr0",
|
|
WithCommand("/bin/sh", "-c", "echo $TEST_ENV_NAME > /out/result; sleep 3600"),
|
|
)
|
|
|
|
chk, err := waitForFileAndRead(filepath.Join(out, "result"), time.Second)
|
|
require.NoError(t, err, "read result")
|
|
require.Equal(t, "TEST_ENV_VALUE\n", string(chk), "check result")
|
|
}
|
|
|
|
// Test annotation injection by NRI plugins.
|
|
func TestNriAnnotationInjection(t *testing.T) {
|
|
skipNriTestIfNecessary(t)
|
|
|
|
t.Log("Test that NRI plugins can inject annotations into containers.")
|
|
|
|
var (
|
|
key = "TEST_ANNOTATION_KEY"
|
|
value = "TEST_ANNOTATION_VALUE"
|
|
tc = &nriTest{
|
|
t: t,
|
|
plugins: []*mockPlugin{{}},
|
|
}
|
|
injectAnnotation = func(p *mockPlugin, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) {
|
|
adjust := &api.ContainerAdjustment{}
|
|
adjust.AddAnnotation(key, value)
|
|
return adjust, nil, nil
|
|
}
|
|
)
|
|
|
|
tc.plugins[0].createContainer = injectAnnotation
|
|
tc.setup()
|
|
|
|
podID := tc.runPod("pod0")
|
|
id := tc.startContainer(podID, "ctr0")
|
|
|
|
timeout := time.After(pluginSyncTimeout)
|
|
err := tc.plugins[0].Wait(ContainerEvent(tc.plugins[0].ctrs[id], PostCreateContainer), timeout)
|
|
require.NoError(t, err, "wait for container post-create event")
|
|
require.Equal(t, value, tc.plugins[0].ctrs[id].Annotations[key], "updated annotations")
|
|
}
|
|
|
|
// Test linux device injection by NRI plugins.
|
|
func TestNriLinuxDeviceInjection(t *testing.T) {
|
|
skipNriTestIfNecessary(t)
|
|
|
|
t.Log("Test that NRI plugins can inject linux devices into containers.")
|
|
|
|
var (
|
|
out = t.TempDir()
|
|
tc = &nriTest{
|
|
t: t,
|
|
plugins: []*mockPlugin{{}},
|
|
}
|
|
injectDev = func(p *mockPlugin, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) {
|
|
adjust := &api.ContainerAdjustment{}
|
|
adjust.AddMount(&api.Mount{
|
|
Destination: "/out",
|
|
Source: out,
|
|
Type: "bind",
|
|
Options: []string{"bind"},
|
|
})
|
|
adjust.AddDevice(&api.LinuxDevice{
|
|
Path: "/dev/pie",
|
|
Type: "c",
|
|
Major: 31,
|
|
Minor: 41,
|
|
Uid: api.UInt32(uint32(11)),
|
|
Gid: api.UInt32(uint32(22)),
|
|
FileMode: api.FileMode(uint32(0664)),
|
|
})
|
|
return adjust, nil, nil
|
|
}
|
|
)
|
|
|
|
tc.plugins[0].createContainer = injectDev
|
|
tc.setup()
|
|
|
|
podID := tc.runPod("pod0")
|
|
tc.startContainer(podID, "ctr0",
|
|
WithCommand("/bin/sh", "-c", "stat -c %F-%a-%u:%g-%t:%T /dev/pie > /out/result; sleep 3600"),
|
|
)
|
|
|
|
chk, err := waitForFileAndRead(filepath.Join(out, "result"), time.Second)
|
|
require.NoError(t, err, "read result")
|
|
require.Equal(t, "character special file-664-11:22-1f:29\n", string(chk), "check result")
|
|
}
|
|
|
|
// Test linux cpuset adjustment by NRI plugins.
|
|
func TestNriLinuxCpusetAdjustment(t *testing.T) {
|
|
skipNriTestIfNecessary(t,
|
|
map[string]bool{
|
|
"not enough online CPUs for test": len(getAvailableCpuset(t)) < 2,
|
|
},
|
|
)
|
|
|
|
t.Log("Test that NRI plugins can adjust linux cpusets of containers.")
|
|
|
|
var (
|
|
out = t.TempDir()
|
|
tc = &nriTest{
|
|
t: t,
|
|
plugins: []*mockPlugin{{}},
|
|
}
|
|
adjustCpuset = func(p *mockPlugin, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) {
|
|
adjust := &api.ContainerAdjustment{}
|
|
adjust.AddMount(&api.Mount{
|
|
Destination: "/out",
|
|
Source: out,
|
|
Type: "bind",
|
|
Options: []string{"bind"},
|
|
})
|
|
adjust.SetLinuxCPUSetCPUs(availableCpuset[1])
|
|
return adjust, nil, nil
|
|
}
|
|
)
|
|
|
|
tc.plugins[0].createContainer = adjustCpuset
|
|
tc.setup()
|
|
|
|
podID := tc.runPod("pod0")
|
|
tc.startContainer(podID, "ctr0",
|
|
WithCommand("/bin/sh", "-c", "grep Cpus_allowed_list: /proc/self/status > /out/result; "+
|
|
"sleep 3600"),
|
|
)
|
|
|
|
chk, err := waitForFileAndRead(filepath.Join(out, "result"), time.Second)
|
|
require.NoError(t, err, "read result")
|
|
|
|
expected := "Cpus_allowed_list:\t" + availableCpuset[1] + "\n"
|
|
require.Equal(t, expected, string(chk), "check result")
|
|
}
|
|
|
|
// Test linux memset adjustment by NRI plugins.
|
|
func TestNriLinuxMemsetAdjustment(t *testing.T) {
|
|
skipNriTestIfNecessary(t,
|
|
map[string]bool{
|
|
"not enough online memory nodes for test": len(getAvailableMemset(t)) < 2,
|
|
},
|
|
)
|
|
|
|
t.Log("Test that NRI plugins can adjust linux memsets of containers.")
|
|
|
|
var (
|
|
out = t.TempDir()
|
|
tc = &nriTest{
|
|
t: t,
|
|
plugins: []*mockPlugin{{}},
|
|
}
|
|
adjustMemset = func(p *mockPlugin, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) {
|
|
adjust := &api.ContainerAdjustment{}
|
|
adjust.AddMount(&api.Mount{
|
|
Destination: "/out",
|
|
Source: out,
|
|
Type: "bind",
|
|
Options: []string{"bind"},
|
|
})
|
|
adjust.SetLinuxCPUSetMems(availableMemset[1])
|
|
return adjust, nil, nil
|
|
}
|
|
)
|
|
|
|
tc.plugins[0].createContainer = adjustMemset
|
|
tc.setup()
|
|
|
|
podID := tc.runPod("pod0")
|
|
tc.startContainer(podID, "ctr0",
|
|
WithCommand("/bin/sh", "-c", "grep Mems_allowed_list: /proc/self/status > /out/result; "+
|
|
"sleep 3600"),
|
|
)
|
|
|
|
chk, err := waitForFileAndRead(filepath.Join(out, "result"), time.Second)
|
|
require.NoError(t, err, "read result")
|
|
|
|
expected := "Mems_allowed_list:\t" + availableMemset[1] + "\n"
|
|
require.Equal(t, expected, string(chk), "check result")
|
|
}
|
|
|
|
// Test creation-time linux cpuset update of existing containers by NRI plugins.
|
|
func TestNriLinuxCpusetAdjustmentUpdate(t *testing.T) {
|
|
skipNriTestIfNecessary(t,
|
|
map[string]bool{
|
|
"not enough online CPUs for test": len(getAvailableCpuset(t)) < 2,
|
|
},
|
|
)
|
|
|
|
t.Log("Test that NRI plugins can update linux cpusets of existing containers.")
|
|
t.Logf("availableCpuset values is %v", availableCpuset)
|
|
|
|
var (
|
|
out = t.TempDir()
|
|
tc = &nriTest{
|
|
t: t,
|
|
plugins: []*mockPlugin{{}},
|
|
}
|
|
ctr0 string
|
|
updateCpuset = func(p *mockPlugin, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) {
|
|
var (
|
|
adjust *api.ContainerAdjustment
|
|
update []*api.ContainerUpdate
|
|
)
|
|
if ctr.Name == "ctr0" {
|
|
ctr0 = ctr.Id
|
|
adjust = &api.ContainerAdjustment{}
|
|
adjust.AddMount(&api.Mount{
|
|
Destination: "/out",
|
|
Source: out,
|
|
Type: "bind",
|
|
Options: []string{"bind"},
|
|
})
|
|
t.Logf("ctr0 availableCpuset values is %v", availableCpuset)
|
|
adjust.SetLinuxCPUSetCPUs(availableCpuset[0])
|
|
} else {
|
|
update = []*api.ContainerUpdate{{}}
|
|
update[0].SetContainerId(ctr0)
|
|
t.Logf("ctr1 availableCpuset values is %v", availableCpuset)
|
|
update[0].SetLinuxCPUSetCPUs(availableCpuset[1])
|
|
}
|
|
return adjust, update, nil
|
|
}
|
|
)
|
|
|
|
tc.plugins[0].createContainer = updateCpuset
|
|
tc.setup()
|
|
|
|
podID := tc.runPod("pod0")
|
|
tc.startContainer(podID, "ctr0",
|
|
WithCommand("/bin/sh", "-c",
|
|
`prev=""
|
|
while true; do
|
|
cpus="$(grep Cpus_allowed_list: /proc/self/status)"
|
|
[ "$cpus" != "$prev" ] && echo "$cpus" > /out/result
|
|
cpus="$prev"
|
|
sleep 0.2
|
|
done`),
|
|
)
|
|
tc.startContainer(podID, "ctr1",
|
|
WithCommand("/bin/sh", "-c", "sleep 3600"),
|
|
)
|
|
|
|
time.Sleep(1 * time.Second)
|
|
|
|
chk, err := waitForFileAndRead(filepath.Join(out, "result"), time.Second)
|
|
require.NoError(t, err, "read result")
|
|
|
|
expected := "Cpus_allowed_list:\t" + availableCpuset[1] + "\n"
|
|
require.Equal(t, expected, string(chk), "check result")
|
|
}
|
|
|
|
// Test creation-time linux memset update of existing containers by NRI plugins.
|
|
func TestNriLinuxMemsetAdjustmentUpdate(t *testing.T) {
|
|
skipNriTestIfNecessary(t,
|
|
map[string]bool{
|
|
"not enough online memory nodes for test": len(getAvailableMemset(t)) < 2,
|
|
},
|
|
)
|
|
|
|
t.Log("Test that NRI plugins can update linux memsets of existing containers.")
|
|
|
|
var (
|
|
out = t.TempDir()
|
|
tc = &nriTest{
|
|
t: t,
|
|
plugins: []*mockPlugin{{}},
|
|
}
|
|
ctr0 string
|
|
updateMemset = func(p *mockPlugin, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) {
|
|
var (
|
|
adjust *api.ContainerAdjustment
|
|
update []*api.ContainerUpdate
|
|
)
|
|
if ctr.Name == "ctr0" {
|
|
ctr0 = ctr.Id
|
|
adjust = &api.ContainerAdjustment{}
|
|
adjust.AddMount(&api.Mount{
|
|
Destination: "/out",
|
|
Source: out,
|
|
Type: "bind",
|
|
Options: []string{"bind"},
|
|
})
|
|
adjust.SetLinuxCPUSetMems(availableMemset[0])
|
|
} else {
|
|
update = []*api.ContainerUpdate{{}}
|
|
update[0].SetContainerId(ctr0)
|
|
update[0].SetLinuxCPUSetMems(availableMemset[1])
|
|
}
|
|
return adjust, update, nil
|
|
}
|
|
)
|
|
|
|
tc.plugins[0].createContainer = updateMemset
|
|
tc.setup()
|
|
|
|
podID := tc.runPod("pod0")
|
|
tc.startContainer(podID, "ctr0",
|
|
WithCommand("/bin/sh", "-c",
|
|
`prev=""
|
|
while true; do
|
|
mems="$(grep Mems_allowed_list: /proc/self/status)"
|
|
[ "$mems" != "$prev" ] && echo "$mems" > /out/result
|
|
mems="$prev"
|
|
sleep 0.2
|
|
done`),
|
|
)
|
|
tc.startContainer(podID, "ctr1",
|
|
WithCommand("/bin/sh", "-c", "sleep 3600"),
|
|
)
|
|
|
|
time.Sleep(1 * time.Second)
|
|
|
|
chk, err := waitForFileAndRead(filepath.Join(out, "result"), time.Second)
|
|
require.NoError(t, err, "read result")
|
|
|
|
expected := "Mems_allowed_list:\t" + availableMemset[1] + "\n"
|
|
require.Equal(t, expected, string(chk), "check result")
|
|
}
|
|
|
|
// Test NRI vs. containerd restart.
|
|
func TestNriPluginContainerdRestart(t *testing.T) {
|
|
skipNriTestIfNecessary(t)
|
|
|
|
t.Log("Test NRI plugins vs. containerd restart.")
|
|
|
|
var (
|
|
tc = &nriTest{
|
|
t: t,
|
|
}
|
|
podCount = 3
|
|
ctrPerPod = 2
|
|
)
|
|
|
|
tc.setup()
|
|
|
|
for i := 0; i < podCount; i++ {
|
|
podID := tc.runPod(fmt.Sprintf("pod%d", i))
|
|
for j := 0; j < ctrPerPod; j++ {
|
|
tc.startContainer(podID, fmt.Sprintf("ctr%d", j))
|
|
}
|
|
}
|
|
|
|
for _, plugin := range []*mockPlugin{{}, {}, {}} {
|
|
tc.connectNewPlugin(plugin)
|
|
}
|
|
|
|
for _, plugin := range tc.plugins {
|
|
err := plugin.Wait(PluginSynchronized, time.After(pluginSyncTimeout))
|
|
require.NoError(t, err, "plugin sync wait")
|
|
}
|
|
|
|
for _, id := range tc.pods {
|
|
for _, plugin := range tc.plugins {
|
|
_, ok := plugin.pods[id]
|
|
require.True(tc.t, ok, "runtime sync of pod "+id)
|
|
}
|
|
}
|
|
|
|
for _, id := range tc.ctrs {
|
|
for _, plugin := range tc.plugins {
|
|
_, ok := plugin.ctrs[id]
|
|
require.True(t, ok, "runtime sync of container "+id)
|
|
}
|
|
}
|
|
|
|
t.Logf("Restart containerd")
|
|
RestartContainerd(t, syscall.SIGTERM)
|
|
|
|
for _, plugin := range tc.plugins {
|
|
require.True(t, plugin.closed, "plugin connection closed")
|
|
}
|
|
}
|
|
|
|
// An NRI test along with its state.
|
|
type nriTest struct {
|
|
t *testing.T
|
|
name string
|
|
prefix string
|
|
runtime cri.RuntimeService
|
|
plugins []*mockPlugin
|
|
namespace string
|
|
sbCfg map[string]*runtime.PodSandboxConfig
|
|
pods []string
|
|
ctrs []string
|
|
}
|
|
|
|
// setup prepares the test environment, connects all NRI plugins.
|
|
func (tc *nriTest) setup() {
|
|
if tc.name == "" {
|
|
tc.name = tc.t.Name()
|
|
}
|
|
if tc.prefix == "" {
|
|
tc.prefix = strings.ToLower(tc.name)
|
|
}
|
|
if tc.namespace == "" {
|
|
tc.namespace = tc.prefix + "-" + strconv.Itoa(os.Getpid())
|
|
}
|
|
|
|
tc.sbCfg = make(map[string]*runtime.PodSandboxConfig)
|
|
tc.runtime = runtimeService
|
|
|
|
for idx, p := range tc.plugins {
|
|
p.namespace = tc.namespace
|
|
|
|
if p.logf == nil {
|
|
p.logf = tc.t.Logf
|
|
}
|
|
if p.idx == "" {
|
|
p.idx = fmt.Sprintf("%02d", idx)
|
|
}
|
|
|
|
err := p.Start()
|
|
require.NoError(tc.t, err, "start plugin")
|
|
|
|
err = p.Wait(PluginSynchronized, time.After(pluginSyncTimeout))
|
|
require.NoError(tc.t, err, "wait for plugin setup")
|
|
}
|
|
}
|
|
|
|
// Connect a new NRI plugin to the runtime.
|
|
func (tc *nriTest) connectNewPlugin(p *mockPlugin) {
|
|
p.namespace = tc.namespace
|
|
|
|
if p.logf == nil {
|
|
p.logf = tc.t.Logf
|
|
}
|
|
if p.idx == "" {
|
|
p.idx = fmt.Sprintf("%02d", len(tc.plugins))
|
|
}
|
|
|
|
tc.plugins = append(tc.plugins, p)
|
|
|
|
err := p.Start()
|
|
require.NoError(tc.t, err, "start plugin")
|
|
|
|
err = p.Wait(PluginSynchronized, time.After(pluginSyncTimeout))
|
|
require.NoError(tc.t, err, "wait for plugin setup")
|
|
}
|
|
|
|
// Create a pod in a namespace specific to the test case.
|
|
func (tc *nriTest) runPod(name string) string {
|
|
id, cfg := PodSandboxConfigWithCleanup(tc.t, name, tc.namespace)
|
|
tc.sbCfg[id] = cfg
|
|
tc.pods = append(tc.pods, id)
|
|
return id
|
|
}
|
|
|
|
// Start a container in a (testcase-specific) pod.
|
|
func (tc *nriTest) startContainer(podID, name string, opts ...ContainerOpts) string {
|
|
podCfg := tc.sbCfg[podID]
|
|
require.NotNil(tc.t, podCfg, "pod config for "+podID)
|
|
|
|
ctrCfg := ContainerConfig(name, "", opts...)
|
|
if ctrCfg.Image.Image == "" {
|
|
image := images.Get(images.BusyBox)
|
|
EnsureImageExists(tc.t, image)
|
|
ctrCfg.Image.Image = image
|
|
}
|
|
|
|
if len(ctrCfg.Command) == 0 {
|
|
WithCommand("/bin/sh", "-c", "sleep 3600")(ctrCfg)
|
|
}
|
|
|
|
ctrID, err := tc.runtime.CreateContainer(podID, ctrCfg, podCfg)
|
|
assert.NoError(tc.t, err, "create container")
|
|
assert.NoError(tc.t, tc.runtime.StartContainer(ctrID), "start container")
|
|
|
|
tc.ctrs = append(tc.ctrs, ctrID)
|
|
|
|
tc.t.Cleanup(func() {
|
|
assert.NoError(tc.t, tc.runtime.StopContainer(ctrID, containerStopTimeout))
|
|
assert.NoError(tc.t, tc.runtime.RemoveContainer(ctrID))
|
|
})
|
|
|
|
tc.t.Logf("created/started container %s/%s/%s with ID %s",
|
|
podCfg.Metadata.Namespace, podCfg.Metadata.Name, name, ctrID)
|
|
|
|
return ctrID
|
|
}
|
|
|
|
//
|
|
// NRI plugin implementation for integration tests
|
|
//
|
|
|
|
type mockPlugin struct {
|
|
name string
|
|
idx string
|
|
stub stub.Stub
|
|
mask stub.EventMask
|
|
|
|
q *eventQ
|
|
pods map[string]*api.PodSandbox
|
|
ctrs map[string]*api.Container
|
|
|
|
closed bool
|
|
namespace string
|
|
logf func(string, ...interface{})
|
|
synchronize func(*mockPlugin, []*api.PodSandbox, []*api.Container) ([]*api.ContainerUpdate, error)
|
|
createContainer func(*mockPlugin, *api.PodSandbox, *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error)
|
|
postCreateContainer func(*mockPlugin, *api.PodSandbox, *api.Container)
|
|
updateContainer func(*mockPlugin, *api.PodSandbox, *api.Container) ([]*api.ContainerUpdate, error)
|
|
postUpdateContainer func(*mockPlugin, *api.PodSandbox, *api.Container)
|
|
stopContainer func(*mockPlugin, *api.PodSandbox, *api.Container) ([]*api.ContainerUpdate, error)
|
|
}
|
|
|
|
func (m *mockPlugin) Start() error {
|
|
var err error
|
|
|
|
if m.name == "" {
|
|
m.name = "mock-plugin"
|
|
}
|
|
if m.idx == "" {
|
|
m.idx = "00"
|
|
}
|
|
|
|
if m.mask == 0 {
|
|
m.mask.Set(
|
|
api.Event_RUN_POD_SANDBOX,
|
|
api.Event_STOP_POD_SANDBOX,
|
|
api.Event_REMOVE_POD_SANDBOX,
|
|
api.Event_CREATE_CONTAINER,
|
|
api.Event_POST_CREATE_CONTAINER,
|
|
api.Event_START_CONTAINER,
|
|
api.Event_POST_START_CONTAINER,
|
|
api.Event_UPDATE_CONTAINER,
|
|
api.Event_POST_UPDATE_CONTAINER,
|
|
api.Event_STOP_CONTAINER,
|
|
api.Event_REMOVE_CONTAINER,
|
|
)
|
|
}
|
|
|
|
m.stub, err = stub.New(m,
|
|
stub.WithPluginName(m.name),
|
|
stub.WithPluginIdx(m.idx),
|
|
stub.WithSocketPath(nriTestSocket),
|
|
stub.WithOnClose(m.onClose),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if m.logf == nil {
|
|
m.logf = func(format string, args ...interface{}) {
|
|
fmt.Printf(format+"\n", args...)
|
|
}
|
|
}
|
|
|
|
if m.synchronize == nil {
|
|
m.synchronize = nopSynchronize
|
|
}
|
|
if m.createContainer == nil {
|
|
m.createContainer = nopCreateContainer
|
|
}
|
|
if m.postCreateContainer == nil {
|
|
m.postCreateContainer = nopEvent
|
|
}
|
|
if m.updateContainer == nil {
|
|
m.updateContainer = nopUpdateContainer
|
|
}
|
|
if m.postUpdateContainer == nil {
|
|
m.postUpdateContainer = nopEvent
|
|
}
|
|
if m.stopContainer == nil {
|
|
m.stopContainer = nopStopContainer
|
|
}
|
|
|
|
m.q = &eventQ{}
|
|
m.pods = make(map[string]*api.PodSandbox)
|
|
m.ctrs = make(map[string]*api.Container)
|
|
|
|
err = m.stub.Start(context.Background())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *mockPlugin) Stop() {
|
|
if m.stub != nil {
|
|
m.stub.Stop()
|
|
m.stub.Wait()
|
|
}
|
|
}
|
|
|
|
func (m *mockPlugin) onClose() {
|
|
m.closed = true
|
|
if m.stub != nil {
|
|
m.stub.Stop()
|
|
m.stub.Wait()
|
|
}
|
|
}
|
|
|
|
func (m *mockPlugin) inNamespace(namespace string) bool {
|
|
return strings.HasPrefix(namespace, m.namespace)
|
|
}
|
|
|
|
func (m *mockPlugin) Log(format string, args ...interface{}) {
|
|
m.logf(fmt.Sprintf("[plugin:%s-%s] ", m.idx, m.name)+format, args...)
|
|
}
|
|
|
|
func (m *mockPlugin) Configure(ctx context.Context, cfg string) (stub.EventMask, error) {
|
|
return m.mask, nil
|
|
}
|
|
|
|
func (m *mockPlugin) Synchronize(ctx context.Context, pods []*api.PodSandbox, ctrs []*api.Container) ([]*api.ContainerUpdate, error) {
|
|
m.Log("Synchronize")
|
|
for _, pod := range pods {
|
|
m.Log(" - pod %s", pod.Id)
|
|
m.pods[pod.Id] = pod
|
|
}
|
|
for _, ctr := range ctrs {
|
|
m.Log(" - ctr %s", ctr.Id)
|
|
m.ctrs[ctr.Id] = ctr
|
|
}
|
|
|
|
m.q.Add(PluginSynchronized)
|
|
|
|
return m.synchronize(m, pods, ctrs)
|
|
}
|
|
|
|
func (m *mockPlugin) Shutdown(ctx context.Context) {
|
|
m.Log("Shutdown")
|
|
}
|
|
|
|
func (m *mockPlugin) RunPodSandbox(ctx context.Context, pod *api.PodSandbox) error {
|
|
if !m.inNamespace(pod.Namespace) {
|
|
return nil
|
|
}
|
|
|
|
m.Log("RunPodSandbox %s/%s", pod.Namespace, pod.Name)
|
|
m.pods[pod.Id] = pod
|
|
m.q.Add(PodSandboxEvent(pod, RunPodSandbox))
|
|
return nil
|
|
}
|
|
|
|
func (m *mockPlugin) StopPodSandbox(ctx context.Context, pod *api.PodSandbox) error {
|
|
if !m.inNamespace(pod.Namespace) {
|
|
return nil
|
|
}
|
|
|
|
m.Log("StopPodSandbox %s/%s", pod.Namespace, pod.Name)
|
|
m.pods[pod.Id] = pod
|
|
m.q.Add(PodSandboxEvent(pod, StopPodSandbox))
|
|
return nil
|
|
}
|
|
|
|
func (m *mockPlugin) RemovePodSandbox(ctx context.Context, pod *api.PodSandbox) error {
|
|
if !m.inNamespace(pod.Namespace) {
|
|
return nil
|
|
}
|
|
|
|
m.Log("RemovePodSandbox %s/%s", pod.Namespace, pod.Name)
|
|
delete(m.pods, pod.Id)
|
|
m.q.Add(PodSandboxEvent(pod, RemovePodSandbox))
|
|
return nil
|
|
}
|
|
|
|
func (m *mockPlugin) CreateContainer(ctx context.Context, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) {
|
|
if !m.inNamespace(pod.Namespace) {
|
|
return nil, nil, nil
|
|
}
|
|
|
|
m.Log("CreateContainer %s/%s/%s", pod.Namespace, pod.Name, ctr.Name)
|
|
m.pods[pod.Id] = pod
|
|
m.ctrs[ctr.Id] = ctr
|
|
m.q.Add(ContainerEvent(ctr, CreateContainer))
|
|
|
|
return m.createContainer(m, pod, ctr)
|
|
}
|
|
|
|
func (m *mockPlugin) PostCreateContainer(ctx context.Context, pod *api.PodSandbox, ctr *api.Container) error {
|
|
if !m.inNamespace(pod.Namespace) {
|
|
return nil
|
|
}
|
|
|
|
m.Log("PostCreateContainer %s/%s/%s", pod.Namespace, pod.Name, ctr.Name)
|
|
m.pods[pod.Id] = pod
|
|
m.ctrs[ctr.Id] = ctr
|
|
m.q.Add(ContainerEvent(ctr, PostCreateContainer))
|
|
m.postCreateContainer(m, pod, ctr)
|
|
return nil
|
|
}
|
|
|
|
func (m *mockPlugin) StartContainer(ctx context.Context, pod *api.PodSandbox, ctr *api.Container) error {
|
|
if !m.inNamespace(pod.Namespace) {
|
|
return nil
|
|
}
|
|
|
|
m.Log("StartContainer %s/%s/%s", pod.Namespace, pod.Name, ctr.Name)
|
|
m.pods[pod.Id] = pod
|
|
m.ctrs[ctr.Id] = ctr
|
|
m.q.Add(ContainerEvent(ctr, StartContainer))
|
|
return nil
|
|
}
|
|
|
|
func (m *mockPlugin) PostStartContainer(ctx context.Context, pod *api.PodSandbox, ctr *api.Container) error {
|
|
if !m.inNamespace(pod.Namespace) {
|
|
return nil
|
|
}
|
|
|
|
m.Log("PostStartContainer %s/%s/%s", pod.Namespace, pod.Name, ctr.Name)
|
|
m.pods[pod.Id] = pod
|
|
m.ctrs[ctr.Id] = ctr
|
|
m.q.Add(ContainerEvent(ctr, PostStartContainer))
|
|
return nil
|
|
}
|
|
|
|
func (m *mockPlugin) UpdateContainer(ctx context.Context, pod *api.PodSandbox, ctr *api.Container) ([]*api.ContainerUpdate, error) {
|
|
if !m.inNamespace(pod.Namespace) {
|
|
return nil, nil
|
|
}
|
|
|
|
m.Log("UpdateContainer %s/%s/%s", pod.Namespace, pod.Name, ctr.Name)
|
|
m.pods[pod.Id] = pod
|
|
m.ctrs[ctr.Id] = ctr
|
|
m.q.Add(ContainerEvent(ctr, UpdateContainer))
|
|
return m.updateContainer(m, pod, ctr)
|
|
}
|
|
|
|
func (m *mockPlugin) PostUpdateContainer(ctx context.Context, pod *api.PodSandbox, ctr *api.Container) error {
|
|
if !m.inNamespace(pod.Namespace) {
|
|
return nil
|
|
}
|
|
|
|
m.Log("PostUpdateContainer %s/%s/%s", pod.Namespace, pod.Name, ctr.Name)
|
|
m.pods[pod.Id] = pod
|
|
m.ctrs[ctr.Id] = ctr
|
|
m.q.Add(ContainerEvent(ctr, PostUpdateContainer))
|
|
m.postUpdateContainer(m, pod, ctr)
|
|
return nil
|
|
}
|
|
|
|
func (m *mockPlugin) StopContainer(ctx context.Context, pod *api.PodSandbox, ctr *api.Container) ([]*api.ContainerUpdate, error) {
|
|
if !m.inNamespace(pod.Namespace) {
|
|
return nil, nil
|
|
}
|
|
|
|
m.Log("StopContainer %s/%s/%s", pod.Namespace, pod.Name, ctr.Name)
|
|
m.pods[pod.Id] = pod
|
|
m.ctrs[ctr.Id] = ctr
|
|
m.q.Add(ContainerEvent(ctr, StopContainer))
|
|
return m.stopContainer(m, pod, ctr)
|
|
}
|
|
|
|
func (m *mockPlugin) RemoveContainer(ctx context.Context, pod *api.PodSandbox, ctr *api.Container) error {
|
|
if !m.inNamespace(pod.Namespace) {
|
|
return nil
|
|
}
|
|
|
|
m.Log("RemoveContainer %s/%s/%s", pod.Namespace, pod.Name, ctr.Name)
|
|
delete(m.pods, pod.Id)
|
|
delete(m.ctrs, ctr.Id)
|
|
m.q.Add(ContainerEvent(ctr, RemoveContainer))
|
|
return nil
|
|
}
|
|
|
|
func (m *mockPlugin) Wait(e *Event, deadline <-chan time.Time) error {
|
|
_, err := m.q.Wait(e, deadline)
|
|
return err
|
|
}
|
|
|
|
func nopSynchronize(*mockPlugin, []*api.PodSandbox, []*api.Container) ([]*api.ContainerUpdate, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func nopCreateContainer(*mockPlugin, *api.PodSandbox, *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) {
|
|
return nil, nil, nil
|
|
}
|
|
|
|
func nopUpdateContainer(*mockPlugin, *api.PodSandbox, *api.Container) ([]*api.ContainerUpdate, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func nopStopContainer(*mockPlugin, *api.PodSandbox, *api.Container) ([]*api.ContainerUpdate, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func nopEvent(*mockPlugin, *api.PodSandbox, *api.Container) {
|
|
}
|
|
|
|
//
|
|
// plugin event and event recording
|
|
//
|
|
|
|
type EventType string
|
|
|
|
const (
|
|
Configured = "configured"
|
|
Synchronized = "synchronized"
|
|
StartupError = "startup-error"
|
|
Shutdown = "shutdown"
|
|
Disconnected = "closed"
|
|
Stopped = "stopped"
|
|
|
|
RunPodSandbox = "RunPodSandbox"
|
|
StopPodSandbox = "StopPodSandbox"
|
|
RemovePodSandbox = "RemovePodSandbox"
|
|
CreateContainer = "CreateContainer"
|
|
StartContainer = "StartContainer"
|
|
UpdateContainer = "UpdateContainer"
|
|
StopContainer = "StopContainer"
|
|
RemoveContainer = "RemoveContainer"
|
|
PostCreateContainer = "PostCreateContainer"
|
|
PostStartContainer = "PostStartContainer"
|
|
PostUpdateContainer = "PostUpdateContainer"
|
|
|
|
Error = "Error"
|
|
Timeout = ""
|
|
)
|
|
|
|
type Event struct {
|
|
Type EventType
|
|
Pod string
|
|
Ctr string
|
|
}
|
|
|
|
var (
|
|
PluginConfigured = &Event{Type: Configured}
|
|
PluginSynchronized = &Event{Type: Synchronized}
|
|
PluginStartupError = &Event{Type: StartupError}
|
|
PluginShutdown = &Event{Type: Shutdown}
|
|
PluginDisconnected = &Event{Type: Disconnected}
|
|
PluginStopped = &Event{Type: Stopped}
|
|
)
|
|
|
|
func PodSandboxEvent(pod *api.PodSandbox, t EventType) *Event {
|
|
return &Event{Type: t, Pod: pod.Id}
|
|
}
|
|
|
|
func ContainerEvent(ctr *api.Container, t EventType) *Event {
|
|
return &Event{Type: t, Ctr: ctr.Id}
|
|
}
|
|
|
|
func PodRefEvent(pod string, t EventType) *Event {
|
|
return &Event{Type: t, Pod: pod}
|
|
}
|
|
|
|
func ContainerRefEvent(pod, ctr string, t EventType) *Event {
|
|
return &Event{Type: t, Pod: pod, Ctr: ctr}
|
|
}
|
|
|
|
func (e *Event) Matches(o *Event) bool {
|
|
if e.Type != o.Type {
|
|
return false
|
|
}
|
|
if e.Pod != "" && o.Pod != "" {
|
|
if e.Pod != o.Pod {
|
|
return false
|
|
}
|
|
}
|
|
if e.Ctr != "" && o.Ctr != "" {
|
|
if e.Ctr != o.Ctr {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (e *Event) String() string {
|
|
str, sep := "", ""
|
|
if e.Pod != "" {
|
|
str = e.Pod
|
|
sep = ":"
|
|
}
|
|
if e.Ctr != "" {
|
|
str += sep + e.Ctr
|
|
sep = "/"
|
|
}
|
|
return str + sep + string(e.Type)
|
|
}
|
|
|
|
type eventQ struct {
|
|
sync.Mutex
|
|
q []*Event
|
|
c chan *Event
|
|
}
|
|
|
|
func (q *eventQ) Add(e *Event) {
|
|
if q == nil {
|
|
return
|
|
}
|
|
q.Lock()
|
|
defer q.Unlock()
|
|
q.q = append(q.q, e)
|
|
if q.c != nil {
|
|
q.c <- e
|
|
}
|
|
}
|
|
|
|
func (q *eventQ) Reset(e *Event) {
|
|
q.Lock()
|
|
defer q.Unlock()
|
|
q.q = []*Event{}
|
|
}
|
|
|
|
func (q *eventQ) Events() []*Event {
|
|
q.Lock()
|
|
defer q.Unlock()
|
|
var events []*Event
|
|
events = append(events, q.q...)
|
|
return events
|
|
}
|
|
|
|
func (q *eventQ) Has(e *Event) bool {
|
|
q.Lock()
|
|
defer q.Unlock()
|
|
return q.search(e) != nil
|
|
}
|
|
|
|
func (q *eventQ) search(e *Event) *Event {
|
|
for _, qe := range q.q {
|
|
if qe.Matches(e) {
|
|
return qe
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (q *eventQ) Wait(w *Event, deadline <-chan time.Time) (*Event, error) {
|
|
var unlocked bool
|
|
q.Lock()
|
|
defer func() {
|
|
if !unlocked {
|
|
q.Unlock()
|
|
}
|
|
}()
|
|
|
|
if e := q.search(w); e != nil {
|
|
return e, nil
|
|
}
|
|
|
|
if q.c != nil {
|
|
return nil, fmt.Errorf("event queue already busy Wait()ing")
|
|
}
|
|
q.c = make(chan *Event, 16)
|
|
defer func() {
|
|
c := q.c
|
|
q.c = nil
|
|
close(c)
|
|
}()
|
|
|
|
q.Unlock()
|
|
unlocked = true
|
|
for {
|
|
select {
|
|
case e := <-q.c:
|
|
if e.Matches(w) {
|
|
return e, nil
|
|
}
|
|
case <-deadline:
|
|
return nil, fmt.Errorf("event queue timed out Wait()ing for %s", w)
|
|
}
|
|
}
|
|
}
|
|
|
|
//
|
|
// helper functions
|
|
//
|
|
|
|
// Wait for a file to show up in the filesystem then read its content.
|
|
func waitForFileAndRead(path string, timeout time.Duration) ([]byte, error) {
|
|
var (
|
|
deadline = time.After(timeout)
|
|
slack = 100 * time.Millisecond
|
|
)
|
|
|
|
WAIT:
|
|
for {
|
|
if _, err := os.Stat(path); err == nil {
|
|
break WAIT
|
|
}
|
|
select {
|
|
case <-deadline:
|
|
return nil, fmt.Errorf("waiting for %s timed out", path)
|
|
default:
|
|
time.Sleep(slack)
|
|
}
|
|
}
|
|
|
|
time.Sleep(slack)
|
|
return os.ReadFile(path)
|
|
}
|
|
|
|
// getAvailableCpuset returns the set of online CPUs.
|
|
func getAvailableCpuset(t *testing.T) []string {
|
|
if availableCpuset == nil {
|
|
availableCpuset = getXxxset(t, "cpuset", onlineCpus)
|
|
}
|
|
return availableCpuset
|
|
}
|
|
|
|
// getAvailableMemset returns the set of usable NUMA nodes.
|
|
func getAvailableMemset(t *testing.T) []string {
|
|
if availableMemset == nil {
|
|
availableMemset = getXxxset(t, "memset", normalMems)
|
|
}
|
|
return availableMemset
|
|
}
|
|
|
|
// getXxxset reads/parses a CPU/memory set into a slice.
|
|
func getXxxset(t *testing.T, kind, path string) []string {
|
|
var (
|
|
data []byte
|
|
set []string
|
|
one uint64
|
|
err error
|
|
)
|
|
|
|
data, err = os.ReadFile(path)
|
|
if err != nil {
|
|
t.Logf("failed to read %s: %v", path, err)
|
|
return nil
|
|
}
|
|
|
|
for _, rng := range strings.Split(strings.TrimSpace(string(data)), ",") {
|
|
var (
|
|
lo int
|
|
hi = -1
|
|
)
|
|
loHi := strings.Split(rng, "-")
|
|
switch len(loHi) {
|
|
case 2:
|
|
one, err = strconv.ParseUint(loHi[1], 10, 32)
|
|
if err != nil {
|
|
t.Errorf("failed to parse %s range %q: %v", kind, rng, err)
|
|
return nil
|
|
}
|
|
hi = int(one) + 1
|
|
fallthrough
|
|
case 1:
|
|
one, err = strconv.ParseUint(loHi[0], 10, 32)
|
|
if err != nil {
|
|
t.Errorf("failed to parse %s range %q: %v", kind, rng, err)
|
|
return nil
|
|
}
|
|
lo = int(one)
|
|
default:
|
|
t.Errorf("invalid %s range %q", kind, rng)
|
|
return nil
|
|
}
|
|
|
|
if hi == -1 {
|
|
hi = lo + 1
|
|
}
|
|
|
|
for i := lo; i < hi; i++ {
|
|
set = append(set, strconv.Itoa(i))
|
|
}
|
|
}
|
|
|
|
return set
|
|
}
|