Merge pull request #8673 from thaJeztah/no_any
avoid "any" as variable name
This commit is contained in:
commit
98b7dfb870
@ -78,11 +78,11 @@ func getEventPayload(r io.Reader) (*types.Any, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var any types.Any
|
var payload types.Any
|
||||||
if err := proto.Unmarshal(data, &any); err != nil {
|
if err := proto.Unmarshal(data, &payload); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &any, nil
|
return &payload, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func connectEvents(address string) (eventsapi.EventsClient, error) {
|
func connectEvents(address string) (eventsapi.EventsClient, error) {
|
||||||
|
11
container.go
11
container.go
@ -255,9 +255,8 @@ func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...N
|
|||||||
}
|
}
|
||||||
for _, m := range mounts {
|
for _, m := range mounts {
|
||||||
if spec.Linux != nil && spec.Linux.MountLabel != "" {
|
if spec.Linux != nil && spec.Linux.MountLabel != "" {
|
||||||
context := label.FormatMountLabel("", spec.Linux.MountLabel)
|
if ml := label.FormatMountLabel("", spec.Linux.MountLabel); ml != "" {
|
||||||
if context != "" {
|
m.Options = append(m.Options, ml)
|
||||||
m.Options = append(m.Options, context)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
request.Rootfs = append(request.Rootfs, &types.Mount{
|
request.Rootfs = append(request.Rootfs, &types.Mount{
|
||||||
@ -288,11 +287,11 @@ func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...N
|
|||||||
}
|
}
|
||||||
request.RuntimePath = info.RuntimePath
|
request.RuntimePath = info.RuntimePath
|
||||||
if info.Options != nil {
|
if info.Options != nil {
|
||||||
any, err := typeurl.MarshalAny(info.Options)
|
o, err := typeurl.MarshalAny(info.Options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
request.Options = protobuf.FromAny(any)
|
request.Options = protobuf.FromAny(o)
|
||||||
}
|
}
|
||||||
t := &task{
|
t := &task{
|
||||||
client: c.client,
|
client: c.client,
|
||||||
@ -455,7 +454,7 @@ func loadFifos(response *tasks.GetResponse) *cio.FIFOSet {
|
|||||||
// we ignore errors here because we don't
|
// we ignore errors here because we don't
|
||||||
// want to remove the directory if it isn't
|
// want to remove the directory if it isn't
|
||||||
// empty
|
// empty
|
||||||
os.Remove(dir)
|
_ = os.Remove(dir)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -58,13 +58,13 @@ func WithCheckpointImage(ctx context.Context, client *Client, c *containers.Cont
|
|||||||
|
|
||||||
// WithCheckpointTask includes the running task
|
// WithCheckpointTask includes the running task
|
||||||
func WithCheckpointTask(ctx context.Context, client *Client, c *containers.Container, index *imagespec.Index, copts *options.CheckpointOptions) error {
|
func WithCheckpointTask(ctx context.Context, client *Client, c *containers.Container, index *imagespec.Index, copts *options.CheckpointOptions) error {
|
||||||
any, err := protobuf.MarshalAnyToProto(copts)
|
opt, err := protobuf.MarshalAnyToProto(copts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
task, err := client.TaskService().Checkpoint(ctx, &tasks.CheckpointTaskRequest{
|
task, err := client.TaskService().Checkpoint(ctx, &tasks.CheckpointTaskRequest{
|
||||||
ContainerID: c.ID,
|
ContainerID: c.ID,
|
||||||
Options: any,
|
Options: opt,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -80,7 +80,7 @@ func WithCheckpointTask(ctx context.Context, client *Client, c *containers.Conta
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
// save copts
|
// save copts
|
||||||
data, err := proto.Marshal(any)
|
data, err := proto.Marshal(opt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -100,8 +100,8 @@ func WithCheckpointTask(ctx context.Context, client *Client, c *containers.Conta
|
|||||||
// WithCheckpointRuntime includes the container runtime info
|
// WithCheckpointRuntime includes the container runtime info
|
||||||
func WithCheckpointRuntime(ctx context.Context, client *Client, c *containers.Container, index *imagespec.Index, copts *options.CheckpointOptions) error {
|
func WithCheckpointRuntime(ctx context.Context, client *Client, c *containers.Container, index *imagespec.Index, copts *options.CheckpointOptions) error {
|
||||||
if c.Runtime.Options != nil && c.Runtime.Options.GetValue() != nil {
|
if c.Runtime.Options != nil && c.Runtime.Options.GetValue() != nil {
|
||||||
any := protobuf.FromAny(c.Runtime.Options)
|
opt := protobuf.FromAny(c.Runtime.Options)
|
||||||
data, err := proto.Marshal(any)
|
data, err := proto.Marshal(opt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -58,18 +58,18 @@ type InfoConfig struct {
|
|||||||
func WithRuntime(name string, options interface{}) NewContainerOpts {
|
func WithRuntime(name string, options interface{}) NewContainerOpts {
|
||||||
return func(ctx context.Context, client *Client, c *containers.Container) error {
|
return func(ctx context.Context, client *Client, c *containers.Container) error {
|
||||||
var (
|
var (
|
||||||
any typeurl.Any
|
opts typeurl.Any
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
if options != nil {
|
if options != nil {
|
||||||
any, err = typeurl.MarshalAny(options)
|
opts, err = typeurl.MarshalAny(options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.Runtime = containers.RuntimeInfo{
|
c.Runtime = containers.RuntimeInfo{
|
||||||
Name: name,
|
Name: name,
|
||||||
Options: any,
|
Options: opts,
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -299,7 +299,7 @@ func WithContainerExtension(name string, extension interface{}) NewContainerOpts
|
|||||||
return fmt.Errorf("extension key must not be zero-length: %w", errdefs.ErrInvalidArgument)
|
return fmt.Errorf("extension key must not be zero-length: %w", errdefs.ErrInvalidArgument)
|
||||||
}
|
}
|
||||||
|
|
||||||
any, err := typeurl.MarshalAny(extension)
|
ext, err := typeurl.MarshalAny(extension)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, typeurl.ErrNotFound) {
|
if errors.Is(err, typeurl.ErrNotFound) {
|
||||||
return fmt.Errorf("extension %q is not registered with the typeurl package, see `typeurl.Register`: %w", name, err)
|
return fmt.Errorf("extension %q is not registered with the typeurl package, see `typeurl.Register`: %w", name, err)
|
||||||
@ -310,7 +310,7 @@ func WithContainerExtension(name string, extension interface{}) NewContainerOpts
|
|||||||
if c.Extensions == nil {
|
if c.Extensions == nil {
|
||||||
c.Extensions = make(map[string]typeurl.Any)
|
c.Extensions = make(map[string]typeurl.Any)
|
||||||
}
|
}
|
||||||
c.Extensions[name] = any
|
c.Extensions[name] = ext
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -46,13 +46,13 @@ type eventRemote struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (e *eventRemote) Publish(ctx context.Context, topic string, event events.Event) error {
|
func (e *eventRemote) Publish(ctx context.Context, topic string, event events.Event) error {
|
||||||
any, err := typeurl.MarshalAny(event)
|
evt, err := typeurl.MarshalAny(event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
req := &eventsapi.PublishRequest{
|
req := &eventsapi.PublishRequest{
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
Event: protobuf.FromAny(any),
|
Event: protobuf.FromAny(evt),
|
||||||
}
|
}
|
||||||
if _, err := e.client.Publish(ctx, req); err != nil {
|
if _, err := e.client.Publish(ctx, req); err != nil {
|
||||||
return errdefs.FromGRPC(err)
|
return errdefs.FromGRPC(err)
|
||||||
|
@ -311,9 +311,9 @@ func TestRunPodSandboxAndTeardownCNISlow(t *testing.T) {
|
|||||||
t.Log("Get sandbox container")
|
t.Log("Get sandbox container")
|
||||||
c, err := GetContainer(sb.Id)
|
c, err := GetContainer(sb.Id)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
any, ok := c.Extensions["io.cri-containerd.sandbox.metadata"]
|
md, ok := c.Extensions["io.cri-containerd.sandbox.metadata"]
|
||||||
require.True(t, ok, "sandbox metadata should exist in extension")
|
require.True(t, ok, "sandbox metadata should exist in extension")
|
||||||
i, err := typeurl.UnmarshalAny(any)
|
i, err := typeurl.UnmarshalAny(md)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.IsType(t, &sandbox.Metadata{}, i)
|
require.IsType(t, &sandbox.Metadata{}, i)
|
||||||
metadata, ok := i.(*sandbox.Metadata)
|
metadata, ok := i.(*sandbox.Metadata)
|
||||||
|
@ -337,17 +337,17 @@ func readContainer(container *containers.Container, bkt *bolt.Bucket) error {
|
|||||||
container.Runtime.Name = string(n)
|
container.Runtime.Name = string(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
any, err := boltutil.ReadAny(rbkt, bucketKeyOptions)
|
o, err := boltutil.ReadAny(rbkt, bucketKeyOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
container.Runtime.Options = any
|
container.Runtime.Options = o
|
||||||
case string(bucketKeySpec):
|
case string(bucketKeySpec):
|
||||||
var any types.Any
|
var spec types.Any
|
||||||
if err := proto.Unmarshal(v, &any); err != nil {
|
if err := proto.Unmarshal(v, &spec); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
container.Spec = &any
|
container.Spec = &spec
|
||||||
case string(bucketKeySnapshotKey):
|
case string(bucketKeySnapshotKey):
|
||||||
container.SnapshotKey = string(v)
|
container.SnapshotKey = string(v)
|
||||||
case string(bucketKeySnapshotter):
|
case string(bucketKeySnapshotter):
|
||||||
|
@ -143,12 +143,12 @@ func (c *criService) updateContainerResources(ctx context.Context,
|
|||||||
|
|
||||||
// updateContainerSpec updates container spec.
|
// updateContainerSpec updates container spec.
|
||||||
func updateContainerSpec(ctx context.Context, cntr containerd.Container, spec *runtimespec.Spec) error {
|
func updateContainerSpec(ctx context.Context, cntr containerd.Container, spec *runtimespec.Spec) error {
|
||||||
any, err := typeurl.MarshalAny(spec)
|
s, err := typeurl.MarshalAny(spec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to marshal spec %+v: %w", spec, err)
|
return fmt.Errorf("failed to marshal spec %+v: %w", spec, err)
|
||||||
}
|
}
|
||||||
if err := cntr.Update(ctx, func(ctx gocontext.Context, client *containerd.Client, c *containers.Container) error {
|
if err := cntr.Update(ctx, func(ctx gocontext.Context, client *containerd.Client, c *containers.Container) error {
|
||||||
c.Spec = any
|
c.Spec = s
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return fmt.Errorf("failed to update container spec: %w", err)
|
return fmt.Errorf("failed to update container spec: %w", err)
|
||||||
|
@ -281,9 +281,9 @@ func (em *eventMonitor) start() <-chan error {
|
|||||||
ids := em.backOff.getExpiredIDs()
|
ids := em.backOff.getExpiredIDs()
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
queue := em.backOff.deBackOff(id)
|
queue := em.backOff.deBackOff(id)
|
||||||
for i, any := range queue.events {
|
for i, evt := range queue.events {
|
||||||
if err := em.handleEvent(any); err != nil {
|
if err := em.handleEvent(evt); err != nil {
|
||||||
log.L.WithError(err).Errorf("Failed to handle backOff event %+v for %s", any, id)
|
log.L.WithError(err).Errorf("Failed to handle backOff event %+v for %s", evt, id)
|
||||||
em.backOff.reBackOff(id, queue.events[i:], queue.duration)
|
em.backOff.reBackOff(id, queue.events[i:], queue.duration)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -80,9 +80,9 @@ func TestBackOff(t *testing.T) {
|
|||||||
t.Logf("Should be able to check if the container is in backOff state")
|
t.Logf("Should be able to check if the container is in backOff state")
|
||||||
for k, queue := range inputQueues {
|
for k, queue := range inputQueues {
|
||||||
for _, e := range queue.events {
|
for _, e := range queue.events {
|
||||||
any, err := typeurl.MarshalAny(e)
|
evt, err := typeurl.MarshalAny(e)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
key, _, err := convertEvent(any)
|
key, _, err := convertEvent(evt)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, k, key)
|
assert.Equal(t, k, key)
|
||||||
assert.Equal(t, actual.isInBackOff(key), true)
|
assert.Equal(t, actual.isInBackOff(key), true)
|
||||||
|
@ -159,9 +159,9 @@ func TestTypeurlMarshalUnmarshalSandboxMeta(t *testing.T) {
|
|||||||
test.configChange(meta.Config)
|
test.configChange(meta.Config)
|
||||||
}
|
}
|
||||||
|
|
||||||
any, err := typeurl.MarshalAny(meta)
|
md, err := typeurl.MarshalAny(meta)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
data, err := typeurl.UnmarshalAny(any)
|
data, err := typeurl.UnmarshalAny(md)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.IsType(t, &sandboxstore.Metadata{}, data)
|
assert.IsType(t, &sandboxstore.Metadata{}, data)
|
||||||
curMeta, ok := data.(*sandboxstore.Metadata)
|
curMeta, ok := data.(*sandboxstore.Metadata)
|
||||||
|
@ -142,12 +142,12 @@ func (c *criService) updateContainerResources(ctx context.Context,
|
|||||||
|
|
||||||
// updateContainerSpec updates container spec.
|
// updateContainerSpec updates container spec.
|
||||||
func updateContainerSpec(ctx context.Context, cntr containerd.Container, spec *runtimespec.Spec) error {
|
func updateContainerSpec(ctx context.Context, cntr containerd.Container, spec *runtimespec.Spec) error {
|
||||||
any, err := typeurl.MarshalAny(spec)
|
s, err := typeurl.MarshalAny(spec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to marshal spec %+v: %w", spec, err)
|
return fmt.Errorf("failed to marshal spec %+v: %w", spec, err)
|
||||||
}
|
}
|
||||||
if err := cntr.Update(ctx, func(ctx gocontext.Context, client *containerd.Client, c *containers.Container) error {
|
if err := cntr.Update(ctx, func(ctx gocontext.Context, client *containerd.Client, c *containers.Container) error {
|
||||||
c.Spec = any
|
c.Spec = s
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return fmt.Errorf("failed to update container spec: %w", err)
|
return fmt.Errorf("failed to update container spec: %w", err)
|
||||||
|
@ -282,9 +282,9 @@ func (em *eventMonitor) start() <-chan error {
|
|||||||
ids := em.backOff.getExpiredIDs()
|
ids := em.backOff.getExpiredIDs()
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
queue := em.backOff.deBackOff(id)
|
queue := em.backOff.deBackOff(id)
|
||||||
for i, any := range queue.events {
|
for i, evt := range queue.events {
|
||||||
if err := em.handleEvent(any); err != nil {
|
if err := em.handleEvent(evt); err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to handle backOff event %+v for %s", any, id)
|
logrus.WithError(err).Errorf("Failed to handle backOff event %+v for %s", evt, id)
|
||||||
em.backOff.reBackOff(id, queue.events[i:], queue.duration)
|
em.backOff.reBackOff(id, queue.events[i:], queue.duration)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -80,9 +80,9 @@ func TestBackOff(t *testing.T) {
|
|||||||
t.Logf("Should be able to check if the container is in backOff state")
|
t.Logf("Should be able to check if the container is in backOff state")
|
||||||
for k, queue := range inputQueues {
|
for k, queue := range inputQueues {
|
||||||
for _, e := range queue.events {
|
for _, e := range queue.events {
|
||||||
any, err := typeurl.MarshalAny(e)
|
evt, err := typeurl.MarshalAny(e)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
key, _, err := convertEvent(any)
|
key, _, err := convertEvent(evt)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, k, key)
|
assert.Equal(t, k, key)
|
||||||
assert.Equal(t, actual.isInBackOff(key), true)
|
assert.Equal(t, actual.isInBackOff(key), true)
|
||||||
|
@ -162,9 +162,9 @@ func TestTypeurlMarshalUnmarshalSandboxMeta(t *testing.T) {
|
|||||||
test.configChange(meta.Config)
|
test.configChange(meta.Config)
|
||||||
}
|
}
|
||||||
|
|
||||||
any, err := typeurl.MarshalAny(meta)
|
md, err := typeurl.MarshalAny(meta)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
data, err := typeurl.UnmarshalAny(any)
|
data, err := typeurl.UnmarshalAny(md)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.IsType(t, &sandboxstore.Metadata{}, data)
|
assert.IsType(t, &sandboxstore.Metadata{}, data)
|
||||||
curMeta, ok := data.(*sandboxstore.Metadata)
|
curMeta, ok := data.(*sandboxstore.Metadata)
|
||||||
|
@ -144,9 +144,9 @@ func (iis *ImageExportStream) MarshalAny(ctx context.Context, sm streaming.Strea
|
|||||||
return typeurl.MarshalAny(s)
|
return typeurl.MarshalAny(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (iis *ImageExportStream) UnmarshalAny(ctx context.Context, sm streaming.StreamGetter, any typeurl.Any) error {
|
func (iis *ImageExportStream) UnmarshalAny(ctx context.Context, sm streaming.StreamGetter, anyType typeurl.Any) error {
|
||||||
var s transfertypes.ImageExportStream
|
var s transfertypes.ImageExportStream
|
||||||
if err := typeurl.UnmarshalTo(any, &s); err != nil {
|
if err := typeurl.UnmarshalTo(anyType, &s); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,9 +84,9 @@ func (iis *ImageImportStream) MarshalAny(ctx context.Context, sm streaming.Strea
|
|||||||
return typeurl.MarshalAny(s)
|
return typeurl.MarshalAny(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (iis *ImageImportStream) UnmarshalAny(ctx context.Context, sm streaming.StreamGetter, any typeurl.Any) error {
|
func (iis *ImageImportStream) UnmarshalAny(ctx context.Context, sm streaming.StreamGetter, anyType typeurl.Any) error {
|
||||||
var s transferapi.ImageImportStream
|
var s transferapi.ImageImportStream
|
||||||
if err := typeurl.UnmarshalTo(any, &s); err != nil {
|
if err := typeurl.UnmarshalTo(anyType, &s); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -261,11 +261,11 @@ func (cc *credCallback) GetCredentials(ctx context.Context, ref, host string) (C
|
|||||||
Host: host,
|
Host: host,
|
||||||
Reference: ref,
|
Reference: ref,
|
||||||
}
|
}
|
||||||
any, err := typeurl.MarshalAny(ar)
|
anyType, err := typeurl.MarshalAny(ar)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Credentials{}, err
|
return Credentials{}, err
|
||||||
}
|
}
|
||||||
if err := cc.stream.Send(any); err != nil {
|
if err := cc.stream.Send(anyType); err != nil {
|
||||||
return Credentials{}, err
|
return Credentials{}, err
|
||||||
}
|
}
|
||||||
resp, err := cc.stream.Recv()
|
resp, err := cc.stream.Recv()
|
||||||
|
@ -53,14 +53,14 @@ func SendStream(ctx context.Context, r io.Reader, stream streaming.Stream) {
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
any, err := stream.Recv()
|
anyType, err := stream.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
|
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
|
||||||
log.G(ctx).WithError(err).Error("send stream ended without EOF")
|
log.G(ctx).WithError(err).Error("send stream ended without EOF")
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
i, err := typeurl.UnmarshalAny(any)
|
i, err := typeurl.UnmarshalAny(anyType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.G(ctx).WithError(err).Error("failed to unmarshal stream object")
|
log.G(ctx).WithError(err).Error("failed to unmarshal stream object")
|
||||||
continue
|
continue
|
||||||
@ -124,13 +124,13 @@ func SendStream(ctx context.Context, r io.Reader, stream streaming.Stream) {
|
|||||||
data := &transferapi.Data{
|
data := &transferapi.Data{
|
||||||
Data: b[:n],
|
Data: b[:n],
|
||||||
}
|
}
|
||||||
any, err := typeurl.MarshalAny(data)
|
anyType, err := typeurl.MarshalAny(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.G(ctx).WithError(err).Errorf("failed to marshal data for send")
|
log.G(ctx).WithError(err).Errorf("failed to marshal data for send")
|
||||||
// TODO: Send error message on stream before close to allow remote side to return error
|
// TODO: Send error message on stream before close to allow remote side to return error
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := stream.Send(any); err != nil {
|
if err := stream.Send(anyType); err != nil {
|
||||||
log.G(ctx).WithError(err).Errorf("send failed")
|
log.G(ctx).WithError(err).Errorf("send failed")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -149,20 +149,20 @@ func ReceiveStream(ctx context.Context, stream streaming.Stream) io.Reader {
|
|||||||
update := &transferapi.WindowUpdate{
|
update := &transferapi.WindowUpdate{
|
||||||
Update: windowSize,
|
Update: windowSize,
|
||||||
}
|
}
|
||||||
any, err := typeurl.MarshalAny(update)
|
anyType, err := typeurl.MarshalAny(update)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.CloseWithError(fmt.Errorf("failed to marshal window update: %w", err))
|
w.CloseWithError(fmt.Errorf("failed to marshal window update: %w", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// check window update error after recv, stream may be complete
|
// check window update error after recv, stream may be complete
|
||||||
if werr = stream.Send(any); werr == nil {
|
if werr = stream.Send(anyType); werr == nil {
|
||||||
window += windowSize
|
window += windowSize
|
||||||
} else if errors.Is(werr, io.EOF) {
|
} else if errors.Is(werr, io.EOF) {
|
||||||
// TODO: Why does send return EOF here
|
// TODO: Why does send return EOF here
|
||||||
werr = nil
|
werr = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
any, err := stream.Recv()
|
anyType, err := stream.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, io.EOF) {
|
if errors.Is(err, io.EOF) {
|
||||||
err = nil
|
err = nil
|
||||||
@ -176,7 +176,7 @@ func ReceiveStream(ctx context.Context, stream streaming.Stream) io.Reader {
|
|||||||
w.CloseWithError(fmt.Errorf("failed to send window update: %w", werr))
|
w.CloseWithError(fmt.Errorf("failed to send window update: %w", werr))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
i, err := typeurl.UnmarshalAny(any)
|
i, err := typeurl.UnmarshalAny(anyType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.CloseWithError(fmt.Errorf("failed to unmarshal received object: %w", err))
|
w.CloseWithError(fmt.Errorf("failed to unmarshal received object: %w", err))
|
||||||
return
|
return
|
||||||
|
@ -42,14 +42,14 @@ func WriteByteStream(ctx context.Context, stream streaming.Stream) io.WriteClose
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
any, err := stream.Recv()
|
anyType, err := stream.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
|
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
|
||||||
log.G(ctx).WithError(err).Error("send byte stream ended without EOF")
|
log.G(ctx).WithError(err).Error("send byte stream ended without EOF")
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
i, err := typeurl.UnmarshalAny(any)
|
i, err := typeurl.UnmarshalAny(anyType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.G(ctx).WithError(err).Error("failed to unmarshal stream object")
|
log.G(ctx).WithError(err).Error("failed to unmarshal stream object")
|
||||||
continue
|
continue
|
||||||
@ -102,19 +102,19 @@ func (wbs *writeByteStream) Write(p []byte) (n int, err error) {
|
|||||||
max = remaining
|
max = remaining
|
||||||
}
|
}
|
||||||
// TODO: continue
|
// TODO: continue
|
||||||
//remaining = remaining - int32(n)
|
// remaining = remaining - int32(n)
|
||||||
|
|
||||||
data := &transferapi.Data{
|
data := &transferapi.Data{
|
||||||
Data: p[:max],
|
Data: p[:max],
|
||||||
}
|
}
|
||||||
var any typeurl.Any
|
var anyType typeurl.Any
|
||||||
any, err = typeurl.MarshalAny(data)
|
anyType, err = typeurl.MarshalAny(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.G(wbs.ctx).WithError(err).Errorf("failed to marshal data for send")
|
log.G(wbs.ctx).WithError(err).Errorf("failed to marshal data for send")
|
||||||
// TODO: Send error message on stream before close to allow remote side to return error
|
// TODO: Send error message on stream before close to allow remote side to return error
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err = wbs.stream.Send(any); err != nil {
|
if err = wbs.stream.Send(anyType); err != nil {
|
||||||
log.G(wbs.ctx).WithError(err).Errorf("send failed")
|
log.G(wbs.ctx).WithError(err).Errorf("send failed")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -39,9 +39,9 @@ func FromAny(from typeurl.Any) *anypb.Any {
|
|||||||
|
|
||||||
// MarshalAnyToProto converts an arbitrary interface to github.com/containerd/containerd/protobuf/types.Any.
|
// MarshalAnyToProto converts an arbitrary interface to github.com/containerd/containerd/protobuf/types.Any.
|
||||||
func MarshalAnyToProto(from interface{}) (*anypb.Any, error) {
|
func MarshalAnyToProto(from interface{}) (*anypb.Any, error) {
|
||||||
any, err := typeurl.MarshalAny(from)
|
anyType, err := typeurl.MarshalAny(from)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return FromAny(any), nil
|
return FromAny(anyType), nil
|
||||||
}
|
}
|
||||||
|
@ -110,7 +110,7 @@ func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
any, err := protobuf.MarshalAnyToProto(event)
|
evt, err := protobuf.MarshalAnyToProto(event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -119,7 +119,7 @@ func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event
|
|||||||
Timestamp: protobuf.ToTimestamp(time.Now()),
|
Timestamp: protobuf.ToTimestamp(time.Now()),
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
Event: any,
|
Event: evt,
|
||||||
},
|
},
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
}
|
}
|
||||||
|
@ -222,19 +222,19 @@ func WithSandboxSpec(s *oci.Spec, opts ...oci.SpecOpts) NewSandboxOpts {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// WithSandboxExtension attaches an extension to sandbox
|
// WithSandboxExtension attaches an extension to sandbox
|
||||||
func WithSandboxExtension(name string, ext interface{}) NewSandboxOpts {
|
func WithSandboxExtension(name string, extension interface{}) NewSandboxOpts {
|
||||||
return func(ctx context.Context, client *Client, s *api.Sandbox) error {
|
return func(ctx context.Context, client *Client, s *api.Sandbox) error {
|
||||||
if s.Extensions == nil {
|
if s.Extensions == nil {
|
||||||
s.Extensions = make(map[string]typeurl.Any)
|
s.Extensions = make(map[string]typeurl.Any)
|
||||||
}
|
}
|
||||||
|
|
||||||
any, err := typeurl.MarshalAny(ext)
|
ext, err := typeurl.MarshalAny(extension)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to marshal sandbox extension: %w", err)
|
return fmt.Errorf("failed to marshal sandbox extension: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.Extensions[name] = any
|
s.Extensions[name] = ext
|
||||||
return err
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,7 +94,7 @@ func (s *service) Transfer(ctx context.Context, req *transferapi.TransferRequest
|
|||||||
defer stream.Close()
|
defer stream.Close()
|
||||||
|
|
||||||
pf := func(p transfer.Progress) {
|
pf := func(p transfer.Progress) {
|
||||||
any, err := typeurl.MarshalAny(&transferTypes.Progress{
|
progress, err := typeurl.MarshalAny(&transferTypes.Progress{
|
||||||
Event: p.Event,
|
Event: p.Event,
|
||||||
Name: p.Name,
|
Name: p.Name,
|
||||||
Parents: p.Parents,
|
Parents: p.Parents,
|
||||||
@ -105,7 +105,7 @@ func (s *service) Transfer(ctx context.Context, req *transferapi.TransferRequest
|
|||||||
log.G(ctx).WithError(err).Warnf("event could not be marshaled: %v/%v", p.Event, p.Name)
|
log.G(ctx).WithError(err).Warnf("event could not be marshaled: %v/%v", p.Event, p.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := stream.Send(any); err != nil {
|
if err := stream.Send(progress); err != nil {
|
||||||
log.G(ctx).WithError(err).Warnf("event not sent: %v/%v", p.Event, p.Name)
|
log.G(ctx).WithError(err).Warnf("event not sent: %v/%v", p.Event, p.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
12
task.go
12
task.go
@ -365,7 +365,7 @@ func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreat
|
|||||||
i.Close()
|
i.Close()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
any, err := protobuf.MarshalAnyToProto(spec)
|
pSpec, err := protobuf.MarshalAnyToProto(spec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -377,7 +377,7 @@ func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreat
|
|||||||
Stdin: cfg.Stdin,
|
Stdin: cfg.Stdin,
|
||||||
Stdout: cfg.Stdout,
|
Stdout: cfg.Stdout,
|
||||||
Stderr: cfg.Stderr,
|
Stderr: cfg.Stderr,
|
||||||
Spec: any,
|
Spec: pSpec,
|
||||||
}
|
}
|
||||||
if _, err := t.client.TaskService().Exec(ctx, request); err != nil {
|
if _, err := t.client.TaskService().Exec(ctx, request); err != nil {
|
||||||
i.Cancel()
|
i.Cancel()
|
||||||
@ -465,11 +465,11 @@ func (t *task) Checkpoint(ctx context.Context, opts ...CheckpointTaskOpts) (Imag
|
|||||||
}
|
}
|
||||||
request.ParentCheckpoint = i.ParentCheckpoint.String()
|
request.ParentCheckpoint = i.ParentCheckpoint.String()
|
||||||
if i.Options != nil {
|
if i.Options != nil {
|
||||||
any, err := protobuf.MarshalAnyToProto(i.Options)
|
o, err := protobuf.MarshalAnyToProto(i.Options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
request.Options = any
|
request.Options = o
|
||||||
}
|
}
|
||||||
|
|
||||||
status, err := t.Status(ctx)
|
status, err := t.Status(ctx)
|
||||||
@ -550,11 +550,11 @@ func (t *task) Update(ctx context.Context, opts ...UpdateTaskOpts) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if i.Resources != nil {
|
if i.Resources != nil {
|
||||||
any, err := typeurl.MarshalAny(i.Resources)
|
r, err := typeurl.MarshalAny(i.Resources)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
request.Resources = protobuf.FromAny(any)
|
request.Resources = protobuf.FromAny(r)
|
||||||
}
|
}
|
||||||
if i.Annotations != nil {
|
if i.Annotations != nil {
|
||||||
request.Annotations = i.Annotations
|
request.Annotations = i.Annotations
|
||||||
|
Loading…
Reference in New Issue
Block a user