Merge pull request #8609 from samuelkarp/issue-8607
This commit is contained in:
commit
f92e576f6b
@ -24,6 +24,8 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/containerd/containerd/defaults"
|
||||
"github.com/containerd/containerd/pkg/atomicfile"
|
||||
|
||||
"github.com/urfave/cli"
|
||||
)
|
||||
|
||||
@ -272,15 +274,14 @@ func WritePidFile(path string, pid int) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tempPath := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s", filepath.Base(path)))
|
||||
f, err := os.OpenFile(tempPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, 0666)
|
||||
f, err := atomicfile.New(path, 0o666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = fmt.Fprintf(f, "%d", pid)
|
||||
f.Close()
|
||||
if err != nil {
|
||||
f.Cancel()
|
||||
return err
|
||||
}
|
||||
return os.Rename(tempPath, path)
|
||||
return f.Close()
|
||||
}
|
||||
|
148
pkg/atomicfile/file.go
Normal file
148
pkg/atomicfile/file.go
Normal file
@ -0,0 +1,148 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
Package atomicfile provides a mechanism (on Unix-like platforms) to present a consistent view of a file to separate
|
||||
processes even while the file is being written. This is accomplished by writing a temporary file, syncing to disk, and
|
||||
renaming over the destination file name.
|
||||
|
||||
Partial/inconsistent reads can occur due to:
|
||||
1. A process attempting to read the file while it is being written to (both in the case of a new file with a
|
||||
short/incomplete write or in the case of an existing, updated file where new bytes may be written at the beginning
|
||||
but old bytes may still be present after).
|
||||
2. Concurrent goroutines leading to multiple active writers of the same file.
|
||||
|
||||
The above mechanism explicitly protects against (1) as all writes are to a file with a temporary name.
|
||||
|
||||
There is no explicit protection against multiple, concurrent goroutines attempting to write the same file. However,
|
||||
atomically writing the file should mean only one writer will "win" and a consistent file will be visible.
|
||||
|
||||
Note: atomicfile is partially implemented for Windows. The Windows codepath performs the same operations, however
|
||||
Windows does not guarantee that a rename operation is atomic; a crash in the middle may leave the destination file
|
||||
truncated rather than with the expected content.
|
||||
*/
|
||||
package atomicfile
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// File is an io.ReadWriteCloser that can also be Canceled if a change needs to be abandoned.
|
||||
type File interface {
|
||||
io.ReadWriteCloser
|
||||
// Cancel abandons a change to a file. This can be called if a write fails or another error occurs.
|
||||
Cancel() error
|
||||
}
|
||||
|
||||
// ErrClosed is returned if Read or Write are called on a closed File.
|
||||
var ErrClosed = errors.New("file is closed")
|
||||
|
||||
// New returns a new atomic file. On Unix-like platforms, the writer (an io.ReadWriteCloser) is backed by a temporary
|
||||
// file placed into the same directory as the destination file (using filepath.Dir to split the directory from the
|
||||
// name). On a call to Close the temporary file is synced to disk and renamed to its final name, hiding any previous
|
||||
// file by the same name.
|
||||
//
|
||||
// Note: Take care to call Close and handle any errors that are returned. Errors returned from Close may indicate that
|
||||
// the file was not written with its final name.
|
||||
func New(name string, mode os.FileMode) (File, error) {
|
||||
return newFile(name, mode)
|
||||
}
|
||||
|
||||
type atomicFile struct {
|
||||
name string
|
||||
f *os.File
|
||||
closed bool
|
||||
closedMu sync.RWMutex
|
||||
}
|
||||
|
||||
func newFile(name string, mode os.FileMode) (File, error) {
|
||||
dir := filepath.Dir(name)
|
||||
f, err := os.CreateTemp(dir, "")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create temp file: %w", err)
|
||||
}
|
||||
if err := f.Chmod(mode); err != nil {
|
||||
return nil, fmt.Errorf("failed to change temp file permissions: %w", err)
|
||||
}
|
||||
return &atomicFile{name: name, f: f}, nil
|
||||
}
|
||||
|
||||
func (a *atomicFile) Close() (err error) {
|
||||
a.closedMu.Lock()
|
||||
defer a.closedMu.Unlock()
|
||||
|
||||
if a.closed {
|
||||
return nil
|
||||
}
|
||||
a.closed = true
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
_ = os.Remove(a.f.Name()) // ignore errors
|
||||
}
|
||||
}()
|
||||
// The order of operations here is:
|
||||
// 1. sync
|
||||
// 2. close
|
||||
// 3. rename
|
||||
// While the ordering of 2 and 3 is not important on Unix-like operating systems, Windows cannot rename an open
|
||||
// file. By closing first, we allow the rename operation to succeed.
|
||||
if err = a.f.Sync(); err != nil {
|
||||
return fmt.Errorf("failed to sync temp file %q: %w", a.f.Name(), err)
|
||||
}
|
||||
if err = a.f.Close(); err != nil {
|
||||
return fmt.Errorf("failed to close temp file %q: %w", a.f.Name(), err)
|
||||
}
|
||||
if err = os.Rename(a.f.Name(), a.name); err != nil {
|
||||
return fmt.Errorf("failed to rename %q to %q: %w", a.f.Name(), a.name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *atomicFile) Cancel() error {
|
||||
a.closedMu.Lock()
|
||||
defer a.closedMu.Unlock()
|
||||
|
||||
if a.closed {
|
||||
return nil
|
||||
}
|
||||
a.closed = true
|
||||
_ = a.f.Close() // ignore error
|
||||
return os.Remove(a.f.Name())
|
||||
}
|
||||
|
||||
func (a *atomicFile) Read(p []byte) (n int, err error) {
|
||||
a.closedMu.RLock()
|
||||
defer a.closedMu.RUnlock()
|
||||
if a.closed {
|
||||
return 0, ErrClosed
|
||||
}
|
||||
return a.f.Read(p)
|
||||
}
|
||||
|
||||
func (a *atomicFile) Write(p []byte) (n int, err error) {
|
||||
a.closedMu.RLock()
|
||||
defer a.closedMu.RUnlock()
|
||||
if a.closed {
|
||||
return 0, ErrClosed
|
||||
}
|
||||
return a.f.Write(p)
|
||||
}
|
77
pkg/atomicfile/file_test.go
Normal file
77
pkg/atomicfile/file_test.go
Normal file
@ -0,0 +1,77 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package atomicfile
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestFile(t *testing.T) {
|
||||
const content = "this is some test content for a file"
|
||||
dir := t.TempDir()
|
||||
path := filepath.Join(dir, "test-file")
|
||||
|
||||
f, err := New(path, 0o644)
|
||||
require.NoError(t, err, "failed to create file")
|
||||
n, err := fmt.Fprint(f, content)
|
||||
assert.NoError(t, err, "failed to write content")
|
||||
assert.Equal(t, len(content), n, "written bytes should be equal")
|
||||
err = f.Close()
|
||||
require.NoError(t, err, "failed to close file")
|
||||
|
||||
actual, err := os.ReadFile(path)
|
||||
assert.NoError(t, err, "failed to read file")
|
||||
assert.Equal(t, content, string(actual))
|
||||
}
|
||||
|
||||
func TestConcurrentWrites(t *testing.T) {
|
||||
const content1 = "this is the first content of the file. there should be none other."
|
||||
const content2 = "the second content of the file should win!"
|
||||
dir := t.TempDir()
|
||||
path := filepath.Join(dir, "test-file")
|
||||
|
||||
file1, err := New(path, 0o600)
|
||||
require.NoError(t, err, "failed to create file1")
|
||||
file2, err := New(path, 0o644)
|
||||
require.NoError(t, err, "failed to create file2")
|
||||
|
||||
n, err := fmt.Fprint(file1, content1)
|
||||
assert.NoError(t, err, "failed to write content1")
|
||||
assert.Equal(t, len(content1), n, "written bytes should be equal")
|
||||
|
||||
n, err = fmt.Fprint(file2, content2)
|
||||
assert.NoError(t, err, "failed to write content2")
|
||||
assert.Equal(t, len(content2), n, "written bytes should be equal")
|
||||
|
||||
err = file1.Close()
|
||||
require.NoError(t, err, "failed to close file1")
|
||||
actual, err := os.ReadFile(path)
|
||||
assert.NoError(t, err, "failed to read file")
|
||||
assert.Equal(t, content1, string(actual))
|
||||
|
||||
err = file2.Close()
|
||||
require.NoError(t, err, "failed to close file2")
|
||||
actual, err = os.ReadFile(path)
|
||||
assert.NoError(t, err, "failed to read file")
|
||||
assert.Equal(t, content2, string(actual))
|
||||
}
|
@ -26,8 +26,10 @@ import (
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/log"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/pkg/atomicfile"
|
||||
)
|
||||
|
||||
// cniConfigTemplate contains the values containerd will overwrite
|
||||
@ -88,27 +90,8 @@ func (c *criService) UpdateRuntimeConfig(ctx context.Context, r *runtime.UpdateR
|
||||
log.G(ctx).Infof("CNI config is successfully loaded, skip generating cni config from template %q", confTemplate)
|
||||
return &runtime.UpdateRuntimeConfigResponse{}, nil
|
||||
}
|
||||
log.G(ctx).Infof("Generating cni config from template %q", confTemplate)
|
||||
// generate cni config file from the template with updated pod cidr.
|
||||
t, err := template.ParseFiles(confTemplate)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse cni config template %q: %w", confTemplate, err)
|
||||
}
|
||||
if err := os.MkdirAll(c.config.NetworkPluginConfDir, 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to create cni config directory: %q: %w", c.config.NetworkPluginConfDir, err)
|
||||
}
|
||||
confFile := filepath.Join(c.config.NetworkPluginConfDir, cniConfigFileName)
|
||||
f, err := os.OpenFile(confFile, os.O_WRONLY|os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open cni config file %q: %w", confFile, err)
|
||||
}
|
||||
defer f.Close()
|
||||
if err := t.Execute(f, cniConfigTemplate{
|
||||
PodCIDR: cidrs[0],
|
||||
PodCIDRRanges: cidrs,
|
||||
Routes: routes,
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("failed to generate cni config file %q: %w", confFile, err)
|
||||
if err := writeCNIConfigFile(ctx, c.config.NetworkPluginConfDir, confTemplate, cidrs[0], cidrs, routes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtime.UpdateRuntimeConfigResponse{}, nil
|
||||
}
|
||||
@ -138,3 +121,28 @@ func getRoutes(cidrs []string) ([]string, error) {
|
||||
}
|
||||
return routes, nil
|
||||
}
|
||||
|
||||
func writeCNIConfigFile(ctx context.Context, confDir string, confTemplate string, podCIDR string, podCIDRRanges []string, routes []string) error {
|
||||
log.G(ctx).Infof("Generating cni config from template %q", confTemplate)
|
||||
// generate cni config file from the template with updated pod cidr.
|
||||
t, err := template.ParseFiles(confTemplate)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse cni config template %q: %w", confTemplate, err)
|
||||
}
|
||||
if err := os.MkdirAll(confDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create cni config directory: %q: %w", confDir, err)
|
||||
}
|
||||
confFile := filepath.Join(confDir, cniConfigFileName)
|
||||
f, err := atomicfile.New(confFile, 0o644)
|
||||
defer func() {
|
||||
err = f.Close()
|
||||
}()
|
||||
if err := t.Execute(f, cniConfigTemplate{
|
||||
PodCIDR: podCIDR,
|
||||
PodCIDRRanges: podCIDRRanges,
|
||||
Routes: routes,
|
||||
}); err != nil {
|
||||
return fmt.Errorf("failed to generate cni config file %q: %w", confFile, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -26,8 +26,10 @@ import (
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/log"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/pkg/atomicfile"
|
||||
)
|
||||
|
||||
// cniConfigTemplate contains the values containerd will overwrite
|
||||
@ -89,27 +91,8 @@ func (c *criService) UpdateRuntimeConfig(ctx context.Context, r *runtime.UpdateR
|
||||
log.G(ctx).Infof("CNI config is successfully loaded, skip generating cni config from template %q", confTemplate)
|
||||
return &runtime.UpdateRuntimeConfigResponse{}, nil
|
||||
}
|
||||
log.G(ctx).Infof("Generating cni config from template %q", confTemplate)
|
||||
// generate cni config file from the template with updated pod cidr.
|
||||
t, err := template.ParseFiles(confTemplate)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse cni config template %q: %w", confTemplate, err)
|
||||
}
|
||||
if err := os.MkdirAll(c.config.NetworkPluginConfDir, 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to create cni config directory: %q: %w", c.config.NetworkPluginConfDir, err)
|
||||
}
|
||||
confFile := filepath.Join(c.config.NetworkPluginConfDir, cniConfigFileName)
|
||||
f, err := os.OpenFile(confFile, os.O_WRONLY|os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open cni config file %q: %w", confFile, err)
|
||||
}
|
||||
defer f.Close()
|
||||
if err := t.Execute(f, cniConfigTemplate{
|
||||
PodCIDR: cidrs[0],
|
||||
PodCIDRRanges: cidrs,
|
||||
Routes: routes,
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("failed to generate cni config file %q: %w", confFile, err)
|
||||
if err := writeCNIConfigFile(ctx, c.config.NetworkPluginConfDir, confTemplate, cidrs[0], cidrs, routes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtime.UpdateRuntimeConfigResponse{}, nil
|
||||
}
|
||||
@ -139,3 +122,28 @@ func getRoutes(cidrs []string) ([]string, error) {
|
||||
}
|
||||
return routes, nil
|
||||
}
|
||||
|
||||
func writeCNIConfigFile(ctx context.Context, confDir string, confTemplate string, podCIDR string, podCIDRRanges []string, routes []string) error {
|
||||
log.G(ctx).Infof("Generating cni config from template %q", confTemplate)
|
||||
// generate cni config file from the template with updated pod cidr.
|
||||
t, err := template.ParseFiles(confTemplate)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse cni config template %q: %w", confTemplate, err)
|
||||
}
|
||||
if err := os.MkdirAll(confDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create cni config directory: %q: %w", confDir, err)
|
||||
}
|
||||
confFile := filepath.Join(confDir, cniConfigFileName)
|
||||
f, err := atomicfile.New(confFile, 0o644)
|
||||
defer func() {
|
||||
err = f.Close()
|
||||
}()
|
||||
if err := t.Execute(f, cniConfigTemplate{
|
||||
PodCIDR: podCIDR,
|
||||
PodCIDRRanges: podCIDRRanges,
|
||||
Routes: routes,
|
||||
}); err != nil {
|
||||
return fmt.Errorf("failed to generate cni config file %q: %w", confFile, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -28,13 +28,15 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/protobuf/proto"
|
||||
"github.com/containerd/containerd/protobuf/types"
|
||||
"github.com/containerd/ttrpc"
|
||||
"github.com/containerd/typeurl/v2"
|
||||
exec "golang.org/x/sys/execabs"
|
||||
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/pkg/atomicfile"
|
||||
"github.com/containerd/containerd/protobuf/proto"
|
||||
"github.com/containerd/containerd/protobuf/types"
|
||||
)
|
||||
|
||||
type CommandConfig struct {
|
||||
@ -124,17 +126,16 @@ func WritePidFile(path string, pid int) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tempPath := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s", filepath.Base(path)))
|
||||
f, err := os.OpenFile(tempPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, 0666)
|
||||
f, err := atomicfile.New(path, 0o666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = fmt.Fprintf(f, "%d", pid)
|
||||
f.Close()
|
||||
if err != nil {
|
||||
f.Cancel()
|
||||
return err
|
||||
}
|
||||
return os.Rename(tempPath, path)
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
// WriteAddress writes a address file atomically
|
||||
@ -143,17 +144,16 @@ func WriteAddress(path, address string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tempPath := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s", filepath.Base(path)))
|
||||
f, err := os.OpenFile(tempPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, 0666)
|
||||
f, err := atomicfile.New(path, 0o666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = f.WriteString(address)
|
||||
f.Close()
|
||||
_, err = f.Write([]byte(address))
|
||||
if err != nil {
|
||||
f.Cancel()
|
||||
return err
|
||||
}
|
||||
return os.Rename(tempPath, path)
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
// ErrNoAddress is returned when the address file has no content
|
||||
|
Loading…
Reference in New Issue
Block a user