use typeurl funcs for marshalling anypb.Any

Signed-off-by: Akhil Mohan <akhilerm@gmail.com>
This commit is contained in:
Akhil Mohan
2024-07-08 21:48:31 +05:30
parent cbb2fc78e9
commit 300fd770a0
36 changed files with 109 additions and 164 deletions

View File

@@ -24,11 +24,10 @@ import (
"github.com/containerd/containerd/v2/core/mount"
"github.com/containerd/containerd/v2/pkg/epoch"
"github.com/containerd/containerd/v2/pkg/oci"
"github.com/containerd/containerd/v2/pkg/protobuf"
ptypes "github.com/containerd/containerd/v2/pkg/protobuf/types"
"github.com/containerd/errdefs"
"github.com/containerd/typeurl/v2"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"google.golang.org/protobuf/types/known/timestamppb"
)
@@ -54,7 +53,7 @@ func (r *diffRemote) Apply(ctx context.Context, desc ocispec.Descriptor, mounts
payloads := make(map[string]*ptypes.Any)
for k, v := range config.ProcessorPayloads {
payloads[k] = protobuf.FromAny(v)
payloads[k] = typeurl.MarshalProto(v)
}
req := &diffapi.ApplyRequest{

View File

@@ -28,7 +28,6 @@ import (
"os/exec"
"sync"
"github.com/containerd/containerd/v2/pkg/protobuf"
"github.com/containerd/containerd/v2/pkg/protobuf/proto"
"github.com/containerd/typeurl/v2"
)
@@ -41,7 +40,7 @@ func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProce
var payloadC io.Closer
if payload != nil {
pb := protobuf.FromAny(payload)
pb := typeurl.MarshalProto(payload)
data, err := proto.Marshal(pb)
if err != nil {
return nil, err

View File

@@ -29,7 +29,6 @@ import (
"github.com/Microsoft/go-winio"
"github.com/containerd/containerd/v2/pkg/protobuf"
"github.com/containerd/containerd/v2/pkg/protobuf/proto"
"github.com/containerd/log"
"github.com/containerd/typeurl/v2"
@@ -44,7 +43,7 @@ func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProce
cmd.Env = append(cmd.Env, env...)
if payload != nil {
pb := protobuf.FromAny(payload)
pb := typeurl.MarshalProto(payload)
data, err := proto.Marshal(pb)
if err != nil {
return nil, err

View File

@@ -70,7 +70,7 @@ func (p *grpcEventsProxy) Publish(ctx context.Context, topic string, event event
}
req := &api.PublishRequest{
Topic: topic,
Event: protobuf.FromAny(evt),
Event: typeurl.MarshalProto(evt),
}
if _, err := p.client.Publish(ctx, req); err != nil {
return errdefs.FromGRPC(err)
@@ -84,7 +84,7 @@ func (p *grpcEventsProxy) Forward(ctx context.Context, envelope *events.Envelope
Timestamp: protobuf.ToTimestamp(envelope.Timestamp),
Namespace: envelope.Namespace,
Topic: envelope.Topic,
Event: protobuf.FromAny(envelope.Event),
Event: typeurl.MarshalProto(envelope.Event),
},
}
if _, err := p.client.Forward(ctx, req); err != nil {
@@ -151,7 +151,7 @@ func (p *ttrpcEventsProxy) Publish(ctx context.Context, topic string, event even
}
req := &api.PublishRequest{
Topic: topic,
Event: protobuf.FromAny(evt),
Event: typeurl.MarshalProto(evt),
}
if _, err := p.client.Publish(ctx, req); err != nil {
return errdefs.FromGRPC(err)
@@ -165,7 +165,7 @@ func (p *ttrpcEventsProxy) Forward(ctx context.Context, envelope *events.Envelop
Timestamp: protobuf.ToTimestamp(envelope.Timestamp),
Namespace: envelope.Namespace,
Topic: envelope.Topic,
Event: protobuf.FromAny(envelope.Event),
Event: typeurl.MarshalProto(envelope.Event),
},
}
if _, err := p.client.Forward(ctx, req); err != nil {

View File

@@ -22,10 +22,10 @@ import (
api "github.com/containerd/containerd/api/services/introspection/v1"
"github.com/containerd/containerd/v2/core/introspection"
"github.com/containerd/containerd/v2/pkg/protobuf"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl/v2"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/emptypb"
@@ -79,7 +79,7 @@ func (i *introspectionRemote) Server(ctx context.Context) (*api.ServerResponse,
func (i *introspectionRemote) PluginInfo(ctx context.Context, pluginType, id string, options any) (resp *api.PluginInfoResponse, err error) {
var optionsPB *anypb.Any
if options != nil {
optionsPB, err = protobuf.MarshalAnyToProto(options)
optionsPB, err = typeurl.MarshalAnyToProto(options)
if err != nil {
return nil, fmt.Errorf("failed to marshal runtime requst: %w", err)
}

View File

@@ -20,7 +20,6 @@ import (
"fmt"
"time"
"github.com/containerd/containerd/v2/pkg/protobuf"
"github.com/containerd/containerd/v2/pkg/protobuf/proto"
"github.com/containerd/containerd/v2/pkg/protobuf/types"
"github.com/containerd/typeurl/v2"
@@ -164,7 +163,7 @@ func WriteExtensions(bkt *bolt.Bucket, extensions map[string]typeurl.Any) error
}
for name, ext := range extensions {
ext := protobuf.FromAny(ext)
ext := typeurl.MarshalProto(ext)
p, err := proto.Marshal(ext)
if err != nil {
return err
@@ -206,7 +205,7 @@ func ReadExtensions(bkt *bolt.Bucket) (map[string]typeurl.Any, error) {
// WriteAny write a protobuf's Any type to the bucket
func WriteAny(bkt *bolt.Bucket, name []byte, any typeurl.Any) error {
pbany := protobuf.FromAny(any)
pbany := typeurl.MarshalProto(any)
if pbany == nil {
return nil
}

View File

@@ -28,7 +28,6 @@ import (
"github.com/containerd/containerd/v2/core/containers"
"github.com/containerd/containerd/v2/pkg/filters"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/containerd/v2/pkg/protobuf"
"github.com/containerd/containerd/v2/pkg/protobuf/types"
"github.com/containerd/errdefs"
"github.com/containerd/log/logtest"
@@ -48,7 +47,7 @@ func TestContainersList(t *testing.T) {
ctx, db := testEnv(t)
store := NewContainerStore(NewDB(db, nil, nil))
spec := &specs.Spec{}
encoded, err := protobuf.MarshalAnyToProto(spec)
encoded, err := typeurl.MarshalAnyToProto(spec)
require.NoError(t, err)
testset := map[string]*containers.Container{}
@@ -178,11 +177,11 @@ func TestContainersCreateUpdateDelete(t *testing.T) {
spec = &specs.Spec{}
)
encoded, err := protobuf.MarshalAnyToProto(spec)
encoded, err := typeurl.MarshalAnyToProto(spec)
require.NoError(t, err)
spec.Annotations = map[string]string{"updated": "true"}
encodedUpdated, err := protobuf.MarshalAnyToProto(spec)
encodedUpdated, err := typeurl.MarshalAnyToProto(spec)
require.NoError(t, err)
for _, testcase := range []struct {

View File

@@ -31,11 +31,10 @@ import (
v2 "github.com/containerd/containerd/v2/core/metrics/cgroups/v2"
v1types "github.com/containerd/containerd/v2/core/metrics/types/v1"
v2types "github.com/containerd/containerd/v2/core/metrics/types/v2"
"github.com/containerd/containerd/v2/pkg/protobuf"
"github.com/prometheus/client_golang/prometheus"
"github.com/containerd/containerd/v2/pkg/protobuf/types"
"github.com/containerd/typeurl/v2"
metrics "github.com/docker/go-metrics"
"github.com/prometheus/client_golang/prometheus"
)
// TestRegressionIssue6772 should not have dead-lock when Collect and Add run
@@ -151,7 +150,7 @@ func (t *mockStatT) Namespace() string {
func (t *mockStatT) Stats(context.Context) (*types.Any, error) {
if t.isV1 {
return protobuf.MarshalAnyToProto(&v1types.Metrics{})
return typeurl.MarshalAnyToProto(&v1types.Metrics{})
}
return protobuf.MarshalAnyToProto(&v2types.Metrics{})
return typeurl.MarshalAnyToProto(&v2types.Metrics{})
}

View File

@@ -28,9 +28,6 @@ import (
"strings"
"time"
"github.com/containerd/containerd/v2/pkg/atomicfile"
"github.com/containerd/containerd/v2/pkg/dialer"
"github.com/containerd/ttrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
@@ -40,6 +37,8 @@ import (
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/v2/core/events/exchange"
"github.com/containerd/containerd/v2/core/runtime"
"github.com/containerd/containerd/v2/pkg/atomicfile"
"github.com/containerd/containerd/v2/pkg/dialer"
"github.com/containerd/containerd/v2/pkg/identifiers"
"github.com/containerd/containerd/v2/pkg/protobuf"
ptypes "github.com/containerd/containerd/v2/pkg/protobuf/types"
@@ -47,6 +46,8 @@ import (
"github.com/containerd/containerd/v2/pkg/timeout"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl/v2"
)
const (
@@ -564,7 +565,7 @@ func (s *shimTask) Create(ctx context.Context, opts runtime.CreateOpts) (runtime
Stderr: opts.IO.Stderr,
Terminal: opts.IO.Terminal,
Checkpoint: opts.Checkpoint,
Options: protobuf.FromAny(topts),
Options: typeurl.MarshalProto(topts),
}
for _, m := range opts.Rootfs {
request.Rootfs = append(request.Rootfs, &types.Mount{

View File

@@ -26,10 +26,6 @@ import (
"strings"
"sync"
"github.com/containerd/log"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
"github.com/containerd/containerd/v2/core/containers"
"github.com/containerd/containerd/v2/core/events/exchange"
"github.com/containerd/containerd/v2/core/metadata"
@@ -37,11 +33,14 @@ import (
"github.com/containerd/containerd/v2/core/sandbox"
"github.com/containerd/containerd/v2/internal/cleanup"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/containerd/v2/pkg/protobuf"
shimbinary "github.com/containerd/containerd/v2/pkg/shim"
"github.com/containerd/containerd/v2/pkg/timeout"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/version"
"github.com/containerd/log"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
"github.com/containerd/typeurl/v2"
)
// ShimConfig for the shim
@@ -256,7 +255,7 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string,
ttrpcAddress: m.containerdTTRPCAddress,
env: m.env,
})
shim, err := b.Start(ctx, protobuf.FromAny(topts), func() {
shim, err := b.Start(ctx, typeurl.MarshalProto(topts), func() {
log.G(ctx).WithField("id", id).Info("shim disconnected")
cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, b)

View File

@@ -37,7 +37,6 @@ import (
"github.com/containerd/containerd/v2/core/runtime"
"github.com/containerd/containerd/v2/internal/cleanup"
"github.com/containerd/containerd/v2/pkg/protobuf"
"github.com/containerd/containerd/v2/pkg/protobuf/proto"
"github.com/containerd/containerd/v2/pkg/timeout"
"github.com/containerd/containerd/v2/plugins"
@@ -272,7 +271,7 @@ func (m *TaskManager) validateRuntimeFeatures(ctx context.Context, opts runtime.
topts = opts.RuntimeOptions
}
pInfo, err := m.PluginInfo(ctx, &apitypes.RuntimeRequest{RuntimePath: opts.Runtime, Options: protobuf.FromAny(topts)})
pInfo, err := m.PluginInfo(ctx, &apitypes.RuntimeRequest{RuntimePath: opts.Runtime, Options: typeurl.MarshalProto(topts)})
if err != nil {
return fmt.Errorf("runtime info: %w", err)
}

View File

@@ -27,20 +27,20 @@ import (
func ToProto(sandbox *Sandbox) *types.Sandbox {
extensions := make(map[string]*gogo_types.Any)
for k, v := range sandbox.Extensions {
extensions[k] = protobuf.FromAny(v)
extensions[k] = typeurl.MarshalProto(v)
}
return &types.Sandbox{
SandboxID: sandbox.ID,
Runtime: &types.Sandbox_Runtime{
Name: sandbox.Runtime.Name,
Options: protobuf.FromAny(sandbox.Runtime.Options),
Options: typeurl.MarshalProto(sandbox.Runtime.Options),
},
Sandboxer: sandbox.Sandboxer,
Labels: sandbox.Labels,
CreatedAt: protobuf.ToTimestamp(sandbox.CreatedAt),
UpdatedAt: protobuf.ToTimestamp(sandbox.UpdatedAt),
Extensions: extensions,
Spec: protobuf.FromAny(sandbox.Spec),
Spec: typeurl.MarshalProto(sandbox.Spec),
}
}

View File

@@ -26,6 +26,7 @@ import (
"github.com/containerd/containerd/v2/core/sandbox"
"github.com/containerd/containerd/v2/pkg/protobuf"
"github.com/containerd/errdefs"
"github.com/containerd/typeurl/v2"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"google.golang.org/protobuf/types/known/anypb"
)
@@ -206,17 +207,17 @@ func (s *remoteSandboxController) Update(
}
func toAPISandbox(sb sandbox.Sandbox) (types.Sandbox, error) {
options, err := protobuf.MarshalAnyToProto(sb.Runtime.Options)
options, err := typeurl.MarshalAnyToProto(sb.Runtime.Options)
if err != nil {
return types.Sandbox{}, err
}
spec, err := protobuf.MarshalAnyToProto(sb.Spec)
spec, err := typeurl.MarshalAnyToProto(sb.Spec)
if err != nil {
return types.Sandbox{}, err
}
extensions := make(map[string]*anypb.Any)
for k, v := range sb.Extensions {
pb, err := protobuf.MarshalAnyToProto(v)
pb, err := typeurl.MarshalAnyToProto(v)
if err != nil {
return types.Sandbox{}, err
}

View File

@@ -24,7 +24,6 @@ import (
streamingapi "github.com/containerd/containerd/api/services/streaming/v1"
"github.com/containerd/containerd/v2/core/streaming"
"github.com/containerd/containerd/v2/pkg/protobuf"
"github.com/containerd/errdefs"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl/v2"
@@ -82,7 +81,7 @@ func (sc *streamCreator) Create(ctx context.Context, id string) (streaming.Strea
if err != nil {
return nil, err
}
err = stream.Send(protobuf.FromAny(a))
err = stream.Send(typeurl.MarshalProto(a))
if err != nil {
if !errors.Is(err, io.EOF) {
err = errdefs.FromGRPC(err)
@@ -108,7 +107,7 @@ type clientStream struct {
}
func (cs *clientStream) Send(a typeurl.Any) (err error) {
err = cs.s.Send(protobuf.FromAny(a))
err = cs.s.Send(typeurl.MarshalProto(a))
if !errors.Is(err, io.EOF) {
err = errdefs.FromGRPC(err)
}