diff --git a/windows/runtime.go b/windows/runtime.go index e08beea46..02f365d11 100644 --- a/windows/runtime.go +++ b/windows/runtime.go @@ -194,12 +194,13 @@ func (r *windowsRuntime) Delete(ctx context.Context, t runtime.Task) (*runtime.E wt.cleanup() r.tasks.Delete(ctx, t) - r.emitter.Post(events.WithTopic(ctx, "/tasks/delete"), &eventsapi.TaskDelete{ - ContainerID: wt.id, - Pid: wt.pid, - ExitStatus: rtExit.Status, - ExitedAt: rtExit.Timestamp, - }) + r.emitter.Post(events.WithTopic(ctx, runtime.TaskDeleteEventTopic), + &eventsapi.TaskDelete{ + ContainerID: wt.id, + Pid: wt.pid, + ExitStatus: rtExit.Status, + ExitedAt: rtExit.Timestamp, + }) if needServicing { ns, _ := namespaces.Namespace(ctx) @@ -311,18 +312,19 @@ func (r *windowsRuntime) newTask(ctx context.Context, namespace, id string, spec }) } - r.emitter.Post(events.WithTopic(ctx, "/tasks/create"), &eventsapi.TaskCreate{ - ContainerID: id, - IO: &eventsapi.TaskIO{ - Stdin: io.Stdin, - Stdout: io.Stdout, - Stderr: io.Stderr, - Terminal: io.Terminal, - }, - Pid: t.pid, - Rootfs: rootfs, - // TODO: what should be in Bundle for windows? - }) + r.emitter.Post(events.WithTopic(ctx, runtime.TaskCreateEventTopic), + &eventsapi.TaskCreate{ + ContainerID: id, + IO: &eventsapi.TaskIO{ + Stdin: io.Stdin, + Stdout: io.Stdout, + Stderr: io.Stderr, + Terminal: io.Terminal, + }, + Pid: t.pid, + Rootfs: rootfs, + // TODO: what should be in Bundle for windows? + }) return t, nil } diff --git a/windows/task.go b/windows/task.go index 6fa960282..6289f9cbb 100644 --- a/windows/task.go +++ b/windows/task.go @@ -112,10 +112,11 @@ func (t *task) Start(ctx context.Context) error { return err } - t.emitter.Post(events.WithTopic(ctx, "/tasks/start"), &eventsapi.TaskStart{ - ContainerID: t.id, - Pid: t.pid, - }) + t.emitter.Post(events.WithTopic(ctx, runtime.TaskStartEventTopic), + &eventsapi.TaskStart{ + ContainerID: t.id, + Pid: t.pid, + }) return nil } @@ -129,9 +130,10 @@ func (t *task) Pause(ctx context.Context) error { t.Unlock() } if err == nil { - t.emitter.Post(events.WithTopic(ctx, "/tasks/paused"), &eventsapi.TaskPaused{ - ContainerID: t.id, - }) + t.emitter.Post(events.WithTopic(ctx, runtime.TaskPausedEventTopic), + &eventsapi.TaskPaused{ + ContainerID: t.id, + }) } return errors.Wrap(err, "hcsshim failed to pause task") } @@ -148,9 +150,10 @@ func (t *task) Resume(ctx context.Context) error { t.Unlock() } if err == nil { - t.emitter.Post(events.WithTopic(ctx, "/tasks/resumed"), &eventsapi.TaskResumed{ - ContainerID: t.id, - }) + t.emitter.Post(events.WithTopic(ctx, runtime.TaskResumedEventTopic), + &eventsapi.TaskResumed{ + ContainerID: t.id, + }) } return errors.Wrap(err, "hcsshim failed to resume task") } @@ -192,11 +195,12 @@ func (t *task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runt return nil, err } - t.emitter.Post(events.WithTopic(ctx, "/tasks/exec-added"), &eventsapi.TaskExecAdded{ - ContainerID: t.id, - ExecID: id, - Pid: p.Pid(), - }) + t.emitter.Post(events.WithTopic(ctx, runtime.TaskExecAddedEventTopic), + &eventsapi.TaskExecAdded{ + ContainerID: t.id, + ExecID: id, + Pid: p.Pid(), + }) return p, nil } @@ -354,13 +358,14 @@ func (t *task) newProcess(ctx context.Context, id string, conf *hcsshim.ProcessC } wp.exitCode = uint32(ec) - t.emitter.Post(events.WithTopic(ctx, "/tasks/exit"), &eventsapi.TaskExit{ - ContainerID: t.id, - ID: id, - Pid: pid, - ExitStatus: wp.exitCode, - ExitedAt: wp.exitTime, - }) + t.emitter.Post(events.WithTopic(ctx, runtime.TaskExitEventTopic), + &eventsapi.TaskExit{ + ContainerID: t.id, + ID: id, + Pid: pid, + ExitStatus: wp.exitCode, + ExitedAt: wp.exitTime, + }) close(wp.exitCh) // Ensure io's are closed