deps: Bump cAdvisor to v0.46.0

Signed-off-by: David Porter <david@porter.me>
This commit is contained in:
David Porter
2022-11-08 18:49:14 -08:00
parent e62cfabf93
commit a2c4672163
46 changed files with 447 additions and 265 deletions

View File

@@ -9,6 +9,3 @@
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/

View File

@@ -1,24 +0,0 @@
dist: bionic
language: go
go:
- "1.13.x"
- "1.15.x"
install:
# Don't change local go.{mod, sum} by go get tools.
#
# ref: https://github.com/golang/go/issues/27643
- pushd ..; go get -u github.com/vbatts/git-validation; popd
- pushd ..; go get -u github.com/kunalkushwaha/ltag; popd
before_script:
- pushd ..; git clone https://github.com/containerd/project; popd
script:
- DCO_VERBOSITY=-q ../project/script/validate/dco
- ../project/script/validate/fileheader ../project/
- go test -v -race -covermode=atomic -coverprofile=coverage.txt ./...
after_success:
- bash <(curl -s https://codecov.io/bash)

View File

@@ -1,6 +1,7 @@
# ttrpc
[![Build Status](https://travis-ci.org/containerd/ttrpc.svg?branch=master)](https://travis-ci.org/containerd/ttrpc)
[![Build Status](https://github.com/containerd/ttrpc/workflows/CI/badge.svg)](https://github.com/containerd/ttrpc/actions?query=workflow%3ACI)
[![codecov](https://codecov.io/gh/containerd/ttrpc/branch/main/graph/badge.svg)](https://codecov.io/gh/containerd/ttrpc)
GRPC for low-memory environments.
@@ -40,13 +41,8 @@ directly, if required.
# Status
Very new. YMMV.
TODO:
- [X] Plumb error codes and GRPC status
- [X] Remove use of any type and dependency on typeurl package
- [X] Ensure that protocol can support streaming in the future
- [ ] Document protocol layout
- [ ] Add testing under concurrent load to ensure
- [ ] Verify connection error handling
@@ -55,8 +51,8 @@ TODO:
ttrpc is a containerd sub-project, licensed under the [Apache 2.0 license](./LICENSE).
As a containerd sub-project, you will find the:
* [Project governance](https://github.com/containerd/project/blob/master/GOVERNANCE.md),
* [Maintainers](https://github.com/containerd/project/blob/master/MAINTAINERS),
* and [Contributing guidelines](https://github.com/containerd/project/blob/master/CONTRIBUTING.md)
* [Project governance](https://github.com/containerd/project/blob/main/GOVERNANCE.md),
* [Maintainers](https://github.com/containerd/project/blob/main/MAINTAINERS),
* and [Contributing guidelines](https://github.com/containerd/project/blob/main/CONTRIBUTING.md)
information in our [`containerd/project`](https://github.com/containerd/project) repository.

View File

@@ -19,11 +19,11 @@ package ttrpc
import (
"bufio"
"encoding/binary"
"fmt"
"io"
"net"
"sync"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -105,7 +105,7 @@ func (ch *channel) recv() (messageHeader, []byte, error) {
if mh.Length > uint32(messageLengthMax) {
if _, err := ch.br.Discard(int(mh.Length)); err != nil {
return mh, nil, errors.Wrapf(err, "failed to discard after receiving oversized message")
return mh, nil, fmt.Errorf("failed to discard after receiving oversized message: %w", err)
}
return mh, nil, status.Errorf(codes.ResourceExhausted, "message length %v exceed maximum message size of %v", mh.Length, messageLengthMax)
@@ -113,7 +113,7 @@ func (ch *channel) recv() (messageHeader, []byte, error) {
p := ch.getmbuf(int(mh.Length))
if _, err := io.ReadFull(ch.br, p); err != nil {
return messageHeader{}, nil, errors.Wrapf(err, "failed reading message")
return messageHeader{}, nil, fmt.Errorf("failed reading message: %w", err)
}
return mh, p, nil

View File

@@ -18,6 +18,7 @@ package ttrpc
import (
"context"
"errors"
"io"
"net"
"os"
@@ -27,7 +28,6 @@ import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -194,72 +194,131 @@ type message struct {
err error
}
type receiver struct {
wg *sync.WaitGroup
messages chan *message
err error
// callMap provides access to a map of active calls, guarded by a mutex.
type callMap struct {
m sync.Mutex
activeCalls map[uint32]*callRequest
closeErr error
}
func (r *receiver) run(ctx context.Context, c *channel) {
defer r.wg.Done()
for {
select {
case <-ctx.Done():
r.err = ctx.Err()
return
default:
mh, p, err := c.recv()
if err != nil {
_, ok := status.FromError(err)
if !ok {
// treat all errors that are not an rpc status as terminal.
// all others poison the connection.
r.err = filterCloseErr(err)
return
}
}
select {
case r.messages <- &message{
messageHeader: mh,
p: p[:mh.Length],
err: err,
}:
case <-ctx.Done():
r.err = ctx.Err()
return
}
}
// newCallMap returns a new callMap with an empty set of active calls.
func newCallMap() *callMap {
return &callMap{
activeCalls: make(map[uint32]*callRequest),
}
}
// set adds a call entry to the map with the given streamID key.
func (cm *callMap) set(streamID uint32, cr *callRequest) error {
cm.m.Lock()
defer cm.m.Unlock()
if cm.closeErr != nil {
return cm.closeErr
}
cm.activeCalls[streamID] = cr
return nil
}
// get looks up the call entry for the given streamID key, then removes it
// from the map and returns it.
func (cm *callMap) get(streamID uint32) (cr *callRequest, ok bool, err error) {
cm.m.Lock()
defer cm.m.Unlock()
if cm.closeErr != nil {
return nil, false, cm.closeErr
}
cr, ok = cm.activeCalls[streamID]
if ok {
delete(cm.activeCalls, streamID)
}
return
}
// abort sends the given error to each active call, and clears the map.
// Once abort has been called, any subsequent calls to the callMap will return the error passed to abort.
func (cm *callMap) abort(err error) error {
cm.m.Lock()
defer cm.m.Unlock()
if cm.closeErr != nil {
return cm.closeErr
}
for streamID, call := range cm.activeCalls {
call.errs <- err
delete(cm.activeCalls, streamID)
}
cm.closeErr = err
return nil
}
func (c *Client) run() {
var (
streamID uint32 = 1
waiters = make(map[uint32]*callRequest)
calls = c.calls
incoming = make(chan *message)
receiversDone = make(chan struct{})
wg sync.WaitGroup
waiters = newCallMap()
receiverDone = make(chan struct{})
)
// broadcast the shutdown error to the remaining waiters.
abortWaiters := func(wErr error) {
for _, waiter := range waiters {
waiter.errs <- wErr
}
}
recv := &receiver{
wg: &wg,
messages: incoming,
}
wg.Add(1)
// Sender goroutine
// Receives calls from dispatch, adds them to the set of active calls, and sends them
// to the server.
go func() {
wg.Wait()
close(receiversDone)
var streamID uint32 = 1
for {
select {
case <-c.ctx.Done():
return
case call := <-c.calls:
id := streamID
streamID += 2 // enforce odd client initiated request ids
if err := waiters.set(id, call); err != nil {
call.errs <- err // errs is buffered so should not block.
continue
}
if err := c.send(id, messageTypeRequest, call.req); err != nil {
call.errs <- err // errs is buffered so should not block.
waiters.get(id) // remove from waiters set
}
}
}
}()
// Receiver goroutine
// Receives responses from the server, looks up the call info in the set of active calls,
// and notifies the caller of the response.
go func() {
defer close(receiverDone)
for {
select {
case <-c.ctx.Done():
c.setError(c.ctx.Err())
return
default:
mh, p, err := c.channel.recv()
if err != nil {
_, ok := status.FromError(err)
if !ok {
// treat all errors that are not an rpc status as terminal.
// all others poison the connection.
c.setError(filterCloseErr(err))
return
}
}
msg := &message{
messageHeader: mh,
p: p[:mh.Length],
err: err,
}
call, ok, err := waiters.get(mh.StreamID)
if err != nil {
logrus.Errorf("ttrpc: failed to look up active call: %s", err)
continue
}
if !ok {
logrus.Errorf("ttrpc: received message for unknown channel %v", mh.StreamID)
continue
}
call.errs <- c.recv(call.resp, msg)
}
}
}()
go recv.run(c.ctx, c.channel)
defer func() {
c.conn.Close()
@@ -269,32 +328,14 @@ func (c *Client) run() {
for {
select {
case call := <-calls:
if err := c.send(streamID, messageTypeRequest, call.req); err != nil {
call.errs <- err
continue
}
waiters[streamID] = call
streamID += 2 // enforce odd client initiated request ids
case msg := <-incoming:
call, ok := waiters[msg.StreamID]
if !ok {
logrus.Errorf("ttrpc: received message for unknown channel %v", msg.StreamID)
continue
}
call.errs <- c.recv(call.resp, msg)
delete(waiters, msg.StreamID)
case <-receiversDone:
// all the receivers have exited
if recv.err != nil {
c.setError(recv.err)
}
case <-receiverDone:
// The receiver has exited.
// don't return out, let the close of the context trigger the abort of waiters
c.Close()
case <-c.ctx.Done():
abortWaiters(c.error())
// Abort all active calls. This will also prevent any new calls from being added
// to waiters.
waiters.abort(c.error())
return
}
}
@@ -347,7 +388,7 @@ func filterCloseErr(err error) error {
return nil
case err == io.EOF:
return ErrClosed
case errors.Cause(err) == io.EOF:
case errors.Is(err, io.EOF):
return ErrClosed
case strings.Contains(err.Error(), "use of closed network connection"):
return ErrClosed

View File

@@ -17,8 +17,9 @@
package ttrpc
import (
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
)
type codec struct{}
@@ -28,7 +29,7 @@ func (c codec) Marshal(msg interface{}) ([]byte, error) {
case proto.Message:
return proto.Marshal(v)
default:
return nil, errors.Errorf("ttrpc: cannot marshal unknown type: %T", msg)
return nil, fmt.Errorf("ttrpc: cannot marshal unknown type: %T", msg)
}
}
@@ -37,6 +38,6 @@ func (c codec) Unmarshal(p []byte, msg interface{}) error {
case proto.Message:
return proto.Unmarshal(p, v)
default:
return errors.Errorf("ttrpc: cannot unmarshal into unknown type: %T", msg)
return fmt.Errorf("ttrpc: cannot unmarshal into unknown type: %T", msg)
}
}

View File

@@ -16,7 +16,7 @@
package ttrpc
import "github.com/pkg/errors"
import "errors"
type serverConfig struct {
handshaker Handshaker

View File

@@ -18,6 +18,7 @@ package ttrpc
import (
"context"
"errors"
"io"
"math/rand"
"net"
@@ -25,7 +26,6 @@ import (
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

View File

@@ -18,13 +18,14 @@ package ttrpc
import (
"context"
"errors"
"fmt"
"io"
"os"
"path"
"unsafe"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -51,7 +52,7 @@ func newServiceSet(interceptor UnaryServerInterceptor) *serviceSet {
func (s *serviceSet) register(name string, methods map[string]Method) {
if _, ok := s.services[name]; ok {
panic(errors.Errorf("duplicate service %v registered", name))
panic(fmt.Errorf("duplicate service %v registered", name))
}
s.services[name] = ServiceDesc{
@@ -116,12 +117,12 @@ func (s *serviceSet) dispatch(ctx context.Context, serviceName, methodName strin
func (s *serviceSet) resolve(service, method string) (Method, error) {
srv, ok := s.services[service]
if !ok {
return nil, status.Errorf(codes.NotFound, "service %v", service)
return nil, status.Errorf(codes.Unimplemented, "service %v", service)
}
mthd, ok := srv.Methods[method]
if !ok {
return nil, status.Errorf(codes.NotFound, "method %v", method)
return nil, status.Errorf(codes.Unimplemented, "method %v", method)
}
return mthd, nil

View File

@@ -18,11 +18,12 @@ package ttrpc
import (
"context"
"errors"
"fmt"
"net"
"os"
"syscall"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
)
@@ -31,12 +32,12 @@ type UnixCredentialsFunc func(*unix.Ucred) error
func (fn UnixCredentialsFunc) Handshake(ctx context.Context, conn net.Conn) (net.Conn, interface{}, error) {
uc, err := requireUnixSocket(conn)
if err != nil {
return nil, nil, errors.Wrap(err, "ttrpc.UnixCredentialsFunc: require unix socket")
return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: require unix socket: %w", err)
}
rs, err := uc.SyscallConn()
if err != nil {
return nil, nil, errors.Wrap(err, "ttrpc.UnixCredentialsFunc: (net.UnixConn).SyscallConn failed")
return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: (net.UnixConn).SyscallConn failed: %w", err)
}
var (
ucred *unix.Ucred
@@ -45,15 +46,15 @@ func (fn UnixCredentialsFunc) Handshake(ctx context.Context, conn net.Conn) (net
if err := rs.Control(func(fd uintptr) {
ucred, ucredErr = unix.GetsockoptUcred(int(fd), unix.SOL_SOCKET, unix.SO_PEERCRED)
}); err != nil {
return nil, nil, errors.Wrapf(err, "ttrpc.UnixCredentialsFunc: (*syscall.RawConn).Control failed")
return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: (*syscall.RawConn).Control failed: %w", err)
}
if ucredErr != nil {
return nil, nil, errors.Wrapf(err, "ttrpc.UnixCredentialsFunc: failed to retrieve socket peer credentials")
return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: failed to retrieve socket peer credentials: %w", err)
}
if err := fn(ucred); err != nil {
return nil, nil, errors.Wrapf(err, "ttrpc.UnixCredentialsFunc: credential check failed")
return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: credential check failed: %w", err)
}
return uc, ucred, nil
@@ -93,7 +94,7 @@ func requireRoot(ucred *unix.Ucred) error {
func requireUidGid(ucred *unix.Ucred, uid, gid int) error {
if (uid != -1 && uint32(uid) != ucred.Uid) || (gid != -1 && uint32(gid) != ucred.Gid) {
return errors.Wrap(syscall.EPERM, "ttrpc: invalid credentials")
return fmt.Errorf("ttrpc: invalid credentials: %v", syscall.EPERM)
}
return nil
}