Merge pull request #3179 from Random-Liu/publish-error
Return event publish errors
This commit is contained in:
commit
eea500d122
@ -59,6 +59,12 @@ var (
|
|||||||
criuFlag string
|
criuFlag string
|
||||||
systemdCgroupFlag bool
|
systemdCgroupFlag bool
|
||||||
containerdBinaryFlag string
|
containerdBinaryFlag string
|
||||||
|
|
||||||
|
bufPool = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return bytes.NewBuffer(nil)
|
||||||
|
},
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -275,16 +281,20 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
|
|||||||
}
|
}
|
||||||
cmd := exec.CommandContext(ctx, containerdBinaryFlag, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
|
cmd := exec.CommandContext(ctx, containerdBinaryFlag, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
|
||||||
cmd.Stdin = bytes.NewReader(data)
|
cmd.Stdin = bytes.NewReader(data)
|
||||||
|
b := bufPool.Get().(*bytes.Buffer)
|
||||||
|
defer bufPool.Put(b)
|
||||||
|
cmd.Stdout = b
|
||||||
|
cmd.Stderr = b
|
||||||
c, err := shim.Default.Start(cmd)
|
c, err := shim.Default.Start(cmd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
status, err := shim.Default.Wait(cmd, c)
|
status, err := shim.Default.Wait(cmd, c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.Wrapf(err, "failed to publish event: %s", b.String())
|
||||||
}
|
}
|
||||||
if status != 0 {
|
if status != 0 {
|
||||||
return errors.New("failed to publish event")
|
return errors.Errorf("failed to publish event: %s", b.String())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"github.com/containerd/containerd/events"
|
"github.com/containerd/containerd/events"
|
||||||
@ -37,6 +38,12 @@ import (
|
|||||||
"golang.org/x/sys/unix"
|
"golang.org/x/sys/unix"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var bufPool = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return bytes.NewBuffer(nil)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// setupSignals creates a new signal handler for all signals and sets the shim as a
|
// setupSignals creates a new signal handler for all signals and sets the shim as a
|
||||||
// sub-reaper so that the container processes are reparented
|
// sub-reaper so that the container processes are reparented
|
||||||
func setupSignals(config Config) (chan os.Signal, error) {
|
func setupSignals(config Config) (chan os.Signal, error) {
|
||||||
@ -106,12 +113,16 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
|
|||||||
}
|
}
|
||||||
cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
|
cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
|
||||||
cmd.Stdin = bytes.NewReader(data)
|
cmd.Stdin = bytes.NewReader(data)
|
||||||
|
b := bufPool.Get().(*bytes.Buffer)
|
||||||
|
defer bufPool.Put(b)
|
||||||
|
cmd.Stdout = b
|
||||||
|
cmd.Stderr = b
|
||||||
if l.noReaper {
|
if l.noReaper {
|
||||||
if err := cmd.Start(); err != nil {
|
if err := cmd.Start(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := cmd.Wait(); err != nil {
|
if err := cmd.Wait(); err != nil {
|
||||||
return errors.Wrap(err, "failed to publish event")
|
return errors.Wrapf(err, "failed to publish event: %s", b.String())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -121,10 +132,10 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
|
|||||||
}
|
}
|
||||||
status, err := Default.Wait(cmd, c)
|
status, err := Default.Wait(cmd, c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.Wrapf(err, "failed to publish event: %s", b.String())
|
||||||
}
|
}
|
||||||
if status != 0 {
|
if status != 0 {
|
||||||
return errors.New("failed to publish event")
|
return errors.Errorf("failed to publish event: %s", b.String())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user