Merge pull request #9743 from klihub/fixes/nri-fd-double-close

go.{mod,sum}: update NRI dependency, fixing a potential fd double close error.
This commit is contained in:
Fu Wei 2024-02-07 08:15:40 +00:00 committed by GitHub
commit 805ed8e871
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 232 additions and 309 deletions

2
go.mod
View File

@ -17,7 +17,7 @@ require (
github.com/containerd/go-cni v1.1.9 github.com/containerd/go-cni v1.1.9
github.com/containerd/go-runc v1.1.0 github.com/containerd/go-runc v1.1.0
github.com/containerd/log v0.1.0 github.com/containerd/log v0.1.0
github.com/containerd/nri v0.5.0 github.com/containerd/nri v0.6.0
github.com/containerd/platforms v0.1.1 github.com/containerd/platforms v0.1.1
github.com/containerd/plugin v0.1.0 github.com/containerd/plugin v0.1.0
github.com/containerd/ttrpc v1.2.3 github.com/containerd/ttrpc v1.2.3

4
go.sum
View File

@ -56,8 +56,8 @@ github.com/containerd/go-runc v1.1.0 h1:OX4f+/i2y5sUT7LhmcJH7GYrjjhHa1QI4e8yO0gG
github.com/containerd/go-runc v1.1.0/go.mod h1:xJv2hFF7GvHtTJd9JqTS2UVxMkULUYw4JN5XAUZqH5U= github.com/containerd/go-runc v1.1.0/go.mod h1:xJv2hFF7GvHtTJd9JqTS2UVxMkULUYw4JN5XAUZqH5U=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/containerd/nri v0.5.0 h1:bwCtKpi8i5FCA8g8WjIZNod91CEfIloYpV0+TH2prnQ= github.com/containerd/nri v0.6.0 h1:hdztxwL0gCS1CrCa9bvD1SoJiFN4jBuRQhplCvCPMj8=
github.com/containerd/nri v0.5.0/go.mod h1:qIu2NlP3r/qK4YGnNuQf0De4VPqQWP2i2CVBfAZbGzg= github.com/containerd/nri v0.6.0/go.mod h1:F7OZfO4QTPqw5r87aq+syZJwiVvRYLIlHZiZDBV1W3A=
github.com/containerd/platforms v0.1.1 h1:gp0xXBoY+1CjH54gJDon0kBjIbK2C4XSX1BGwP5ptG0= github.com/containerd/platforms v0.1.1 h1:gp0xXBoY+1CjH54gJDon0kBjIbK2C4XSX1BGwP5ptG0=
github.com/containerd/platforms v0.1.1/go.mod h1:XOM2BS6kN6gXafPLg80V6y/QUib+xoLyC3qVmHzibko= github.com/containerd/platforms v0.1.1/go.mod h1:XOM2BS6kN6gXafPLg80V6y/QUib+xoLyC3qVmHzibko=
github.com/containerd/plugin v0.1.0 h1:CYMyZk9beRAIe1FEKItbMLLAz/z16aXrGc+B+nv0fU4= github.com/containerd/plugin v0.1.0 h1:CYMyZk9beRAIe1FEKItbMLLAz/z16aXrGc+B+nv0fU4=

View File

@ -81,7 +81,7 @@ type process interface {
Pid() uint32 Pid() uint32
} }
// Task is ta subset of containerd's Task interface. // Task is a subset of containerd's Task interface.
type Task interface { type Task interface {
process process

View File

@ -29,6 +29,7 @@ import (
"github.com/containerd/nri/pkg/api" "github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/log" "github.com/containerd/nri/pkg/log"
"github.com/containerd/ttrpc"
) )
const ( const (
@ -60,6 +61,8 @@ type Adaptation struct {
dontListen bool dontListen bool
syncFn SyncFn syncFn SyncFn
updateFn UpdateFn updateFn UpdateFn
clientOpts []ttrpc.ClientOpts
serverOpts []ttrpc.ServerOpt
listener net.Listener listener net.Listener
plugins []*plugin plugins []*plugin
} }
@ -104,6 +107,15 @@ func WithDisabledExternalConnections() Option {
} }
} }
// WithTTRPCOptions sets extra client and server options to use for ttrpc.
func WithTTRPCOptions(clientOpts []ttrpc.ClientOpts, serverOpts []ttrpc.ServerOpt) Option {
return func(r *Adaptation) error {
r.clientOpts = append(r.clientOpts, clientOpts...)
r.serverOpts = append(r.serverOpts, serverOpts...)
return nil
}
}
// New creates a new NRI Runtime. // New creates a new NRI Runtime.
func New(name, version string, syncFn SyncFn, updateFn UpdateFn, opts ...Option) (*Adaptation, error) { func New(name, version string, syncFn SyncFn, updateFn UpdateFn, opts ...Option) (*Adaptation, error) {
var err error var err error

View File

@ -198,12 +198,16 @@ func (p *plugin) connect(conn stdnet.Conn) (retErr error) {
if err != nil { if err != nil {
return fmt.Errorf("failed to mux plugin connection for plugin %q: %w", p.name(), err) return fmt.Errorf("failed to mux plugin connection for plugin %q: %w", p.name(), err)
} }
rpcc := ttrpc.NewClient(pconn, ttrpc.WithOnClose(
clientOpts := []ttrpc.ClientOpts{
ttrpc.WithOnClose(
func() { func() {
log.Infof(noCtx, "connection to plugin %q closed", p.name()) log.Infof(noCtx, "connection to plugin %q closed", p.name())
close(p.closeC) close(p.closeC)
p.close() p.close()
})) }),
}
rpcc := ttrpc.NewClient(pconn, append(clientOpts, p.r.clientOpts...)...)
defer func() { defer func() {
if retErr != nil { if retErr != nil {
rpcc.Close() rpcc.Close()
@ -211,7 +215,7 @@ func (p *plugin) connect(conn stdnet.Conn) (retErr error) {
}() }()
stub := api.NewPluginClient(rpcc) stub := api.NewPluginClient(rpcc)
rpcs, err := ttrpc.NewServer() rpcs, err := ttrpc.NewServer(p.r.serverOpts...)
if err != nil { if err != nil {
return fmt.Errorf("failed to create ttrpc server for plugin %q: %w", p.name(), err) return fmt.Errorf("failed to create ttrpc server for plugin %q: %w", p.name(), err)
} }

93
vendor/github.com/containerd/nri/pkg/net/socketpair.go generated vendored Normal file
View File

@ -0,0 +1,93 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package net
import (
"fmt"
"net"
"os"
)
// SocketPair contains the os.File of a connected pair of sockets.
type SocketPair struct {
local, peer *os.File
}
// NewSocketPair returns a connected pair of sockets.
func NewSocketPair() (SocketPair, error) {
fds, err := newSocketPairCLOEXEC()
if err != nil {
return SocketPair{nil, nil}, fmt.Errorf("failed to create socketpair: %w", err)
}
filename := fmt.Sprintf("socketpair-#%d:%d", fds[0], fds[1])
return SocketPair{
os.NewFile(uintptr(fds[0]), filename+"[0]"),
os.NewFile(uintptr(fds[1]), filename+"[1]"),
}, nil
}
// LocalFile returns the local end of the socketpair as an *os.File.
func (sp SocketPair) LocalFile() *os.File {
return sp.local
}
// PeerFile returns the peer end of the socketpair as an *os.File.
func (sp SocketPair) PeerFile() *os.File {
return sp.peer
}
// LocalConn returns a net.Conn for the local end of the socketpair.
// This closes LocalFile().
func (sp SocketPair) LocalConn() (net.Conn, error) {
file := sp.LocalFile()
defer file.Close()
conn, err := net.FileConn(file)
if err != nil {
return nil, fmt.Errorf("failed to create net.Conn for %s: %w", file.Name(), err)
}
return conn, nil
}
// PeerConn returns a net.Conn for the peer end of the socketpair.
// This closes PeerFile().
func (sp SocketPair) PeerConn() (net.Conn, error) {
file := sp.PeerFile()
defer file.Close()
conn, err := net.FileConn(file)
if err != nil {
return nil, fmt.Errorf("failed to create net.Conn for %s: %w", file.Name(), err)
}
return conn, nil
}
// Close closes both ends of the socketpair.
func (sp SocketPair) Close() {
sp.LocalClose()
sp.PeerClose()
}
// LocalClose closes the local end of the socketpair.
func (sp SocketPair) LocalClose() {
sp.local.Close()
}
// PeerClose closes the peer end of the socketpair.
func (sp SocketPair) PeerClose() {
sp.peer.Close()
}

View File

@ -0,0 +1,27 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package net
import (
"golang.org/x/sys/unix"
)
func newSocketPairCLOEXEC() ([2]int, error) {
return unix.Socketpair(unix.AF_UNIX, unix.SOCK_STREAM|unix.SOCK_CLOEXEC, 0)
}

View File

@ -0,0 +1,38 @@
//go:build !linux && !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package net
import (
"syscall"
"golang.org/x/sys/unix"
)
func newSocketPairCLOEXEC() ([2]int, error) {
syscall.ForkLock.RLock()
defer syscall.ForkLock.RUnlock()
fds, err := unix.Socketpair(unix.AF_UNIX, unix.SOCK_STREAM, 0)
if err != nil {
return fds, err
}
unix.CloseOnExec(fds[0])
unix.CloseOnExec(fds[1])
return fds, err
}

View File

@ -0,0 +1,30 @@
//go:build windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package net
import (
"errors"
sys "golang.org/x/sys/windows"
)
func newSocketPairCLOEXEC() ([2]sys.Handle, error) {
// when implementing do use WSA_FLAG_NO_HANDLE_INHERIT to avoid leaking FDs
return [2]sys.Handle{sys.InvalidHandle, sys.InvalidHandle}, errors.New("newSocketPairCLOEXEC unimplemented for windows")
}

View File

@ -1,97 +0,0 @@
//go:build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package net
import (
"fmt"
"net"
"os"
syscall "golang.org/x/sys/unix"
)
const (
local = 0
peer = 1
)
// SocketPair contains the file descriptors of a connected pair of sockets.
type SocketPair [2]int
// NewSocketPair returns a connected pair of sockets.
func NewSocketPair() (SocketPair, error) {
fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0)
if err != nil {
return [2]int{-1, -1}, fmt.Errorf("failed to create socketpair: %w", err)
}
return fds, nil
}
// LocalFile returns the socketpair fd for local usage as an *os.File.
func (fds SocketPair) LocalFile() *os.File {
return os.NewFile(uintptr(fds[local]), fds.fileName()+"[0]")
}
// PeerFile returns the socketpair fd for peer usage as an *os.File.
func (fds SocketPair) PeerFile() *os.File {
return os.NewFile(uintptr(fds[peer]), fds.fileName()+"[1]")
}
// LocalConn returns a net.Conn for the local end of the socketpair.
func (fds SocketPair) LocalConn() (net.Conn, error) {
file := fds.LocalFile()
defer file.Close()
conn, err := net.FileConn(file)
if err != nil {
return nil, fmt.Errorf("failed to create net.Conn for %s[0]: %w", fds.fileName(), err)
}
return conn, nil
}
// PeerConn returns a net.Conn for the peer end of the socketpair.
func (fds SocketPair) PeerConn() (net.Conn, error) {
file := fds.PeerFile()
defer file.Close()
conn, err := net.FileConn(file)
if err != nil {
return nil, fmt.Errorf("failed to create net.Conn for %s[1]: %w", fds.fileName(), err)
}
return conn, nil
}
// Close closes both ends of the socketpair.
func (fds SocketPair) Close() {
fds.LocalClose()
fds.PeerClose()
}
// LocalClose closes the local end of the socketpair.
func (fds SocketPair) LocalClose() {
syscall.Close(fds[local])
}
// PeerClose closes the peer end of the socketpair.
func (fds SocketPair) PeerClose() {
syscall.Close(fds[peer])
}
func (fds SocketPair) fileName() string {
return fmt.Sprintf("socketpair-#%d:%d[0]", fds[local], fds[peer])
}

View File

@ -1,197 +0,0 @@
//go:build windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package net
import (
"fmt"
"net"
"os"
"unsafe"
sys "golang.org/x/sys/windows"
)
// SocketPair contains a connected pair of sockets.
type SocketPair [2]sys.Handle
const (
local = 0
peer = 1
)
// NewSocketPair returns a connected pair of sockets.
func NewSocketPair() (SocketPair, error) {
/* return [2]sys.Handle{sys.InvalidHandle, sys.InvalidHandle},
errors.New("failed to emulatesocketpair, unimplemented for windows")*/
// untested: return emulateWithPreConnect()
return emulateWithPreConnect()
}
func emulateWithPreConnect() (SocketPair, error) {
var (
invalid = SocketPair{sys.InvalidHandle, sys.InvalidHandle}
sa sys.SockaddrInet4
//sn sys.Sockaddr
l sys.Handle
a sys.Handle
p sys.Handle
err error
)
l, err = socket(sys.AF_INET, sys.SOCK_STREAM, 0)
if err != nil {
return invalid, fmt.Errorf("failed to emulate socketpair (local Socket()): %w", err)
}
defer func() {
if err != nil {
sys.CloseHandle(l)
}
}()
sa.Addr[0] = 127
sa.Addr[3] = 1
sa.Port = 9999
err = sys.Bind(l, &sa)
if err != nil {
return invalid, fmt.Errorf("failed to emulate socketpair (Bind()): %w", err)
}
/*sn, err = sys.Getsockname(l)
if err != nil {
return invalid, fmt.Errorf("failed to emulate socketpair (Getsockname()): %w", err)
}*/
p, err = socket(sys.AF_INET, sys.SOCK_STREAM, 0)
if err != nil {
return invalid, fmt.Errorf("failed to emulate socketpair (peer Socket()): %w", err)
}
defer func() {
if err != nil {
sys.CloseHandle(p)
}
}()
err = sys.Listen(l, 0)
if err != nil {
return invalid, fmt.Errorf("failed to emulate socketpair (Listen()): %w", err)
}
go func() {
err = connect(p, &sa)
}()
a, err = accept(l, sys.AF_INET, sys.SOCK_STREAM, 0)
if err != nil {
return invalid, fmt.Errorf("failed to emualte socketpair (Accept()): %w", err)
}
defer func() {
if err != nil {
sys.CloseHandle(a)
}
}()
sys.CloseHandle(l)
return SocketPair{a, p}, nil
}
// Close closes both ends of the socketpair.
func (sp SocketPair) Close() {
sp.LocalClose()
sp.PeerClose()
}
// LocalFile returns the socketpair fd for local usage as an *os.File.
func (sp SocketPair) LocalFile() *os.File {
return os.NewFile(uintptr(sp[local]), sp.fileName()+"[0]")
}
// PeerFile returns the socketpair fd for peer usage as an *os.File.
func (sp SocketPair) PeerFile() *os.File {
return os.NewFile(uintptr(sp[peer]), sp.fileName()+"[1]")
}
// LocalConn returns a net.Conn for the local end of the socketpair.
func (sp SocketPair) LocalConn() (net.Conn, error) {
file := sp.LocalFile()
defer file.Close()
conn, err := net.FileConn(file)
if err != nil {
return nil, fmt.Errorf("failed to create net.Conn for %s[0]: %w", sp.fileName(), err)
}
return conn, nil
}
// PeerConn returns a net.Conn for the peer end of the socketpair.
func (sp SocketPair) PeerConn() (net.Conn, error) {
file := sp.PeerFile()
defer file.Close()
conn, err := net.FileConn(file)
if err != nil {
return nil, fmt.Errorf("failed to create net.Conn for %s[1]: %w", sp.fileName(), err)
}
return conn, nil
}
// LocalClose closes the local end of the socketpair.
func (sp SocketPair) LocalClose() {
sys.CloseHandle(sp[local])
}
// PeerClose closes the peer end of the socketpair.
func (sp SocketPair) PeerClose() {
sys.CloseHandle(sp[peer])
}
func (sp SocketPair) fileName() string {
return fmt.Sprintf("socketpair-#%d:%d[0]", sp[local], sp[peer])
}
func socket(domain, typ, proto int) (sys.Handle, error) {
return sys.WSASocket(int32(domain), int32(typ), int32(proto), nil, 0, sys.WSA_FLAG_OVERLAPPED)
}
func connect(s sys.Handle, sa sys.Sockaddr) error {
o := &sys.Overlapped{}
return sys.ConnectEx(s, sa, nil, 0, nil, o)
}
func accept(l sys.Handle, domain, typ, proto int) (sys.Handle, error) {
var (
a sys.Handle
err error
buf = [1024]byte{}
overlap = sys.Overlapped{}
cnt = uint32(16 + 256)
)
a, err = socket(sys.AF_INET, sys.SOCK_STREAM, 0)
if err != nil {
return sys.InvalidHandle, fmt.Errorf("failed to emulate socketpair (accept): %w", err)
}
err = sys.AcceptEx(l, a, (*byte)(unsafe.Pointer(&buf)), 0, cnt, cnt, &cnt, &overlap)
if err != nil {
return sys.InvalidHandle, fmt.Errorf("failed to emulate socketpair (AcceptEx()): %w", err)
}
return a, nil
}

View File

@ -227,6 +227,15 @@ func WithDialer(d func(string) (stdnet.Conn, error)) Option {
} }
} }
// WithTTRPCOptions sets extra client and server options to use for ttrpc .
func WithTTRPCOptions(clientOpts []ttrpc.ClientOpts, serverOpts []ttrpc.ServerOpt) Option {
return func(s *stub) error {
s.clientOpts = append(s.clientOpts, clientOpts...)
s.serverOpts = append(s.serverOpts, serverOpts...)
return nil
}
}
// stub implements Stub. // stub implements Stub.
type stub struct { type stub struct {
sync.Mutex sync.Mutex
@ -239,6 +248,8 @@ type stub struct {
dialer func(string) (stdnet.Conn, error) dialer func(string) (stdnet.Conn, error)
conn stdnet.Conn conn stdnet.Conn
onClose func() onClose func()
serverOpts []ttrpc.ServerOpt
clientOpts []ttrpc.ClientOpts
rpcm multiplex.Mux rpcm multiplex.Mux
rpcl stdnet.Listener rpcl stdnet.Listener
rpcs *ttrpc.Server rpcs *ttrpc.Server
@ -334,7 +345,7 @@ func (stub *stub) Start(ctx context.Context) (retErr error) {
} }
}() }()
rpcs, err := ttrpc.NewServer() rpcs, err := ttrpc.NewServer(stub.serverOpts...)
if err != nil { if err != nil {
return fmt.Errorf("failed to create ttrpc server: %w", err) return fmt.Errorf("failed to create ttrpc server: %w", err)
} }
@ -351,11 +362,13 @@ func (stub *stub) Start(ctx context.Context) (retErr error) {
if err != nil { if err != nil {
return fmt.Errorf("failed to multiplex ttrpc client connection: %w", err) return fmt.Errorf("failed to multiplex ttrpc client connection: %w", err)
} }
rpcc := ttrpc.NewClient(conn,
clientOpts := []ttrpc.ClientOpts{
ttrpc.WithOnClose(func() { ttrpc.WithOnClose(func() {
stub.connClosed() stub.connClosed()
}), }),
) }
rpcc := ttrpc.NewClient(conn, append(clientOpts, stub.clientOpts...)...)
defer func() { defer func() {
if retErr != nil { if retErr != nil {
rpcc.Close() rpcc.Close()

2
vendor/modules.txt vendored
View File

@ -128,7 +128,7 @@ github.com/containerd/go-runc
## explicit; go 1.20 ## explicit; go 1.20
github.com/containerd/log github.com/containerd/log
github.com/containerd/log/logtest github.com/containerd/log/logtest
# github.com/containerd/nri v0.5.0 # github.com/containerd/nri v0.6.0
## explicit; go 1.19 ## explicit; go 1.19
github.com/containerd/nri github.com/containerd/nri
github.com/containerd/nri/pkg/adaptation github.com/containerd/nri/pkg/adaptation