Signed-off-by: John Howard <jhoward@microsoft.com>
This commit is contained in:
John Howard
2019-03-25 11:11:17 -07:00
parent 68c44f8cc8
commit e16e995939
10 changed files with 161 additions and 96 deletions

View File

@@ -7,9 +7,14 @@ func logOperationBegin(ctx logrus.Fields, msg string) {
}
func logOperationEnd(ctx logrus.Fields, msg string, err error) {
// Copy the log and fields first.
log := logrus.WithFields(ctx)
if err == nil {
logrus.WithFields(ctx).Debug(msg)
log.Debug(msg)
} else {
logrus.WithFields(ctx).WithError(err).Error(msg)
// Edit only the copied field data to avoid race conditions on the
// write.
log.Data[logrus.ErrorKey] = err
log.Error(msg)
}
}

View File

@@ -23,6 +23,9 @@ type Process struct {
callbackNumber uintptr
logctx logrus.Fields
waitBlock chan struct{}
waitError error
}
func newProcess(process hcsProcess, processID int, computeSystem *System) *Process {
@@ -31,10 +34,10 @@ func newProcess(process hcsProcess, processID int, computeSystem *System) *Proce
processID: processID,
system: computeSystem,
logctx: logrus.Fields{
logfields.HCSOperation: "",
logfields.ContainerID: computeSystem.ID(),
logfields.ProcessID: processID,
logfields.ContainerID: computeSystem.ID(),
logfields.ProcessID: processID,
},
waitBlock: make(chan struct{}),
}
}
@@ -88,13 +91,12 @@ func (process *Process) SystemID() string {
}
func (process *Process) logOperationBegin(operation string) {
process.logctx[logfields.HCSOperation] = operation
logOperationBegin(
process.logctx,
"hcsshim::Process - Begin Operation")
operation+" - Begin Operation")
}
func (process *Process) logOperationEnd(err error) {
func (process *Process) logOperationEnd(operation string, err error) {
var result string
if err == nil {
result = "Success"
@@ -104,9 +106,8 @@ func (process *Process) logOperationEnd(err error) {
logOperationEnd(
process.logctx,
"hcsshim::Process - End Operation - "+result,
operation+" - End Operation - "+result,
err)
process.logctx[logfields.HCSOperation] = ""
}
// Signal signals the process with `options`.
@@ -116,7 +117,7 @@ func (process *Process) Signal(options guestrequest.SignalProcessOptions) (err e
operation := "hcsshim::Process::Signal"
process.logOperationBegin(operation)
defer func() { process.logOperationEnd(err) }()
defer func() { process.logOperationEnd(operation, err) }()
if process.handle == 0 {
return makeProcessError(process, operation, ErrAlreadyClosed, nil)
@@ -148,7 +149,7 @@ func (process *Process) Kill() (err error) {
operation := "hcsshim::Process::Kill"
process.logOperationBegin(operation)
defer func() { process.logOperationEnd(err) }()
defer func() { process.logOperationEnd(operation, err) }()
if process.handle == 0 {
return makeProcessError(process, operation, ErrAlreadyClosed, nil)
@@ -166,33 +167,47 @@ func (process *Process) Kill() (err error) {
return nil
}
// Wait waits for the process to exit.
// waitBackground waits for the process exit notification. Once received sets
// `process.waitError` (if any) and unblocks all `Wait` and `WaitTimeout` calls.
//
// This MUST be called exactly once per `process.handle` but `Wait` and
// `WaitTimeout` are safe to call multiple times.
func (process *Process) waitBackground() {
process.waitError = waitForNotification(process.callbackNumber, hcsNotificationProcessExited, nil)
close(process.waitBlock)
}
// Wait waits for the process to exit. If the process has already exited returns
// the pervious error (if any).
func (process *Process) Wait() (err error) {
operation := "hcsshim::Process::Wait"
process.logOperationBegin(operation)
defer func() { process.logOperationEnd(err) }()
defer func() { process.logOperationEnd(operation, err) }()
err = waitForNotification(process.callbackNumber, hcsNotificationProcessExited, nil)
if err != nil {
<-process.waitBlock
if process.waitError != nil {
return makeProcessError(process, operation, err, nil)
}
return nil
}
// WaitTimeout waits for the process to exit or the duration to elapse. It returns
// false if timeout occurs.
// WaitTimeout waits for the process to exit or the duration to elapse. If the
// process has already exited returns the pervious error (if any). If a timeout
// occurs returns `ErrTimeout`.
func (process *Process) WaitTimeout(timeout time.Duration) (err error) {
operation := "hcssshim::Process::WaitTimeout"
process.logOperationBegin(operation)
defer func() { process.logOperationEnd(err) }()
defer func() { process.logOperationEnd(operation, err) }()
err = waitForNotification(process.callbackNumber, hcsNotificationProcessExited, &timeout)
if err != nil {
return makeProcessError(process, operation, err, nil)
select {
case <-process.waitBlock:
if process.waitError != nil {
return makeProcessError(process, operation, process.waitError, nil)
}
return nil
case <-time.After(timeout):
return makeProcessError(process, operation, ErrTimeout, nil)
}
return nil
}
// ResizeConsole resizes the console of the process.
@@ -202,7 +217,7 @@ func (process *Process) ResizeConsole(width, height uint16) (err error) {
operation := "hcsshim::Process::ResizeConsole"
process.logOperationBegin(operation)
defer func() { process.logOperationEnd(err) }()
defer func() { process.logOperationEnd(operation, err) }()
if process.handle == 0 {
return makeProcessError(process, operation, ErrAlreadyClosed, nil)
@@ -239,7 +254,7 @@ func (process *Process) Properties() (_ *ProcessStatus, err error) {
operation := "hcsshim::Process::Properties"
process.logOperationBegin(operation)
defer func() { process.logOperationEnd(err) }()
defer func() { process.logOperationEnd(operation, err) }()
if process.handle == 0 {
return nil, makeProcessError(process, operation, ErrAlreadyClosed, nil)
@@ -275,19 +290,24 @@ func (process *Process) Properties() (_ *ProcessStatus, err error) {
func (process *Process) ExitCode() (_ int, err error) {
operation := "hcsshim::Process::ExitCode"
process.logOperationBegin(operation)
defer func() { process.logOperationEnd(err) }()
defer func() { process.logOperationEnd(operation, err) }()
properties, err := process.Properties()
if err != nil {
return 0, makeProcessError(process, operation, err, nil)
return -1, makeProcessError(process, operation, err, nil)
}
if properties.Exited == false {
return 0, makeProcessError(process, operation, ErrInvalidProcessState, nil)
return -1, makeProcessError(process, operation, ErrInvalidProcessState, nil)
}
if properties.LastWaitResult != 0 {
return 0, makeProcessError(process, operation, syscall.Errno(properties.LastWaitResult), nil)
logrus.WithFields(logrus.Fields{
logfields.ContainerID: process.SystemID(),
logfields.ProcessID: process.processID,
"wait-result": properties.LastWaitResult,
}).Warn("hcsshim::Process::ExitCode - Non-zero last wait result")
return -1, nil
}
return int(properties.ExitCode), nil
@@ -302,7 +322,7 @@ func (process *Process) Stdio() (_ io.WriteCloser, _ io.ReadCloser, _ io.ReadClo
operation := "hcsshim::Process::Stdio"
process.logOperationBegin(operation)
defer func() { process.logOperationEnd(err) }()
defer func() { process.logOperationEnd(operation, err) }()
if process.handle == 0 {
return nil, nil, nil, makeProcessError(process, operation, ErrAlreadyClosed, nil)
@@ -346,7 +366,7 @@ func (process *Process) CloseStdin() (err error) {
operation := "hcsshim::Process::CloseStdin"
process.logOperationBegin(operation)
defer func() { process.logOperationEnd(err) }()
defer func() { process.logOperationEnd(operation, err) }()
if process.handle == 0 {
return makeProcessError(process, operation, ErrAlreadyClosed, nil)
@@ -384,7 +404,7 @@ func (process *Process) Close() (err error) {
operation := "hcsshim::Process::Close"
process.logOperationBegin(operation)
defer func() { process.logOperationEnd(err) }()
defer func() { process.logOperationEnd(operation, err) }()
// Don't double free this
if process.handle == 0 {
@@ -453,7 +473,7 @@ func (process *Process) unregisterCallback() error {
closeChannels(context.channels)
callbackMapLock.Lock()
callbackMap[callbackNumber] = nil
delete(callbackMap, callbackNumber)
callbackMapLock.Unlock()
handle = 0

View File

@@ -43,26 +43,28 @@ type System struct {
callbackNumber uintptr
logctx logrus.Fields
waitBlock chan struct{}
waitError error
}
func newSystem(id string) *System {
return &System{
id: id,
logctx: logrus.Fields{
logfields.HCSOperation: "",
logfields.ContainerID: id,
logfields.ContainerID: id,
},
waitBlock: make(chan struct{}),
}
}
func (computeSystem *System) logOperationBegin(operation string) {
computeSystem.logctx[logfields.HCSOperation] = operation
logOperationBegin(
computeSystem.logctx,
"hcsshim::ComputeSystem - Begin Operation")
operation+" - Begin Operation")
}
func (computeSystem *System) logOperationEnd(err error) {
func (computeSystem *System) logOperationEnd(operation string, err error) {
var result string
if err == nil {
result = "Success"
@@ -72,9 +74,8 @@ func (computeSystem *System) logOperationEnd(err error) {
logOperationEnd(
computeSystem.logctx,
"hcsshim::ComputeSystem - End Operation - "+result,
operation+" - End Operation - "+result,
err)
computeSystem.logctx[logfields.HCSOperation] = ""
}
// CreateComputeSystem creates a new compute system with the given configuration but does not start it.
@@ -83,7 +84,7 @@ func CreateComputeSystem(id string, hcsDocumentInterface interface{}) (_ *System
computeSystem := newSystem(id)
computeSystem.logOperationBegin(operation)
defer func() { computeSystem.logOperationEnd(err) }()
defer func() { computeSystem.logOperationEnd(operation, err) }()
hcsDocumentB, err := json.Marshal(hcsDocumentInterface)
if err != nil {
@@ -124,6 +125,8 @@ func CreateComputeSystem(id string, hcsDocumentInterface interface{}) (_ *System
return nil, makeSystemError(computeSystem, operation, hcsDocument, err, events)
}
go computeSystem.waitBackground()
return computeSystem, nil
}
@@ -135,9 +138,9 @@ func OpenComputeSystem(id string) (_ *System, err error) {
computeSystem.logOperationBegin(operation)
defer func() {
if IsNotExist(err) {
computeSystem.logOperationEnd(nil)
computeSystem.logOperationEnd(operation, nil)
} else {
computeSystem.logOperationEnd(err)
computeSystem.logOperationEnd(operation, err)
}
}()
@@ -156,6 +159,7 @@ func OpenComputeSystem(id string) (_ *System, err error) {
if err = computeSystem.registerCallback(); err != nil {
return nil, makeSystemError(computeSystem, operation, "", err, nil)
}
go computeSystem.waitBackground()
return computeSystem, nil
}
@@ -163,12 +167,10 @@ func OpenComputeSystem(id string) (_ *System, err error) {
// GetComputeSystems gets a list of the compute systems on the system that match the query
func GetComputeSystems(q schema1.ComputeSystemQuery) (_ []schema1.ContainerProperties, err error) {
operation := "hcsshim::GetComputeSystems"
fields := logrus.Fields{
logfields.HCSOperation: operation,
}
fields := logrus.Fields{}
logOperationBegin(
fields,
"hcsshim::ComputeSystem - Begin Operation")
operation+" - Begin Operation")
defer func() {
var result string
@@ -180,7 +182,7 @@ func GetComputeSystems(q schema1.ComputeSystemQuery) (_ []schema1.ContainerPrope
logOperationEnd(
fields,
"hcsshim::ComputeSystem - End Operation - "+result,
operation+" - End Operation - "+result,
err)
}()
@@ -227,7 +229,7 @@ func (computeSystem *System) Start() (err error) {
operation := "hcsshim::ComputeSystem::Start"
computeSystem.logOperationBegin(operation)
defer func() { computeSystem.logOperationEnd(err) }()
defer func() { computeSystem.logOperationEnd(operation, err) }()
if computeSystem.handle == 0 {
return makeSystemError(computeSystem, "Start", "", ErrAlreadyClosed, nil)
@@ -285,10 +287,10 @@ func (computeSystem *System) Shutdown() (err error) {
operation := "hcsshim::ComputeSystem::Shutdown"
computeSystem.logOperationBegin(operation)
defer func() {
if IsAlreadyStopped(err) {
computeSystem.logOperationEnd(nil)
if IsAlreadyStopped(err) || IsPending(err) {
computeSystem.logOperationEnd(operation, nil)
} else {
computeSystem.logOperationEnd(err)
computeSystem.logOperationEnd(operation, err)
}
}()
@@ -318,9 +320,9 @@ func (computeSystem *System) Terminate() (err error) {
computeSystem.logOperationBegin(operation)
defer func() {
if IsPending(err) {
computeSystem.logOperationEnd(nil)
computeSystem.logOperationEnd(operation, nil)
} else {
computeSystem.logOperationEnd(err)
computeSystem.logOperationEnd(operation, err)
}
}()
@@ -340,48 +342,65 @@ func (computeSystem *System) Terminate() (err error) {
return nil
}
// Wait synchronously waits for the compute system to shutdown or terminate.
// waitBackground waits for the compute system exit notification. Once received
// sets `computeSystem.waitError` (if any) and unblocks all `Wait`,
// `WaitExpectedError`, and `WaitTimeout` calls.
//
// This MUST be called exactly once per `computeSystem.handle` but `Wait`,
// `WaitExpectedError`, and `WaitTimeout` are safe to call multiple times.
func (computeSystem *System) waitBackground() {
computeSystem.waitError = waitForNotification(computeSystem.callbackNumber, hcsNotificationSystemExited, nil)
close(computeSystem.waitBlock)
}
// Wait synchronously waits for the compute system to shutdown or terminate. If
// the compute system has already exited returns the previous error (if any).
func (computeSystem *System) Wait() (err error) {
operation := "hcsshim::ComputeSystem::Wait"
computeSystem.logOperationBegin(operation)
defer func() { computeSystem.logOperationEnd(err) }()
defer func() { computeSystem.logOperationEnd(operation, err) }()
err = waitForNotification(computeSystem.callbackNumber, hcsNotificationSystemExited, nil)
if err != nil {
return makeSystemError(computeSystem, "Wait", "", err, nil)
<-computeSystem.waitBlock
if computeSystem.waitError != nil {
return makeSystemError(computeSystem, "Wait", "", computeSystem.waitError, nil)
}
return nil
}
// WaitExpectedError synchronously waits for the compute system to shutdown or
// terminate, and ignores the passed error if it occurs.
// terminate and returns the error (if any) as long as it does not match
// `expected`. If the compute system has already exited returns the previous
// error (if any) as long as it does not match `expected`.
func (computeSystem *System) WaitExpectedError(expected error) (err error) {
operation := "hcsshim::ComputeSystem::WaitExpectedError"
computeSystem.logOperationBegin(operation)
defer func() { computeSystem.logOperationEnd(err) }()
defer func() { computeSystem.logOperationEnd(operation, err) }()
err = waitForNotification(computeSystem.callbackNumber, hcsNotificationSystemExited, nil)
if err != nil && err != expected {
return makeSystemError(computeSystem, "WaitExpectedError", "", err, nil)
<-computeSystem.waitBlock
if computeSystem.waitError != nil && getInnerError(computeSystem.waitError) != expected {
return makeSystemError(computeSystem, "WaitExpectedError", "", computeSystem.waitError, nil)
}
return nil
}
// WaitTimeout synchronously waits for the compute system to terminate or the duration to elapse.
// If the timeout expires, IsTimeout(err) == true
// WaitTimeout synchronously waits for the compute system to terminate or the
// duration to elapse. If the timeout expires, `IsTimeout(err) == true`. If
// the compute system has already exited returns the previous error (if any).
func (computeSystem *System) WaitTimeout(timeout time.Duration) (err error) {
operation := "hcsshim::ComputeSystem::WaitTimeout"
computeSystem.logOperationBegin(operation)
defer func() { computeSystem.logOperationEnd(err) }()
defer func() { computeSystem.logOperationEnd(operation, err) }()
err = waitForNotification(computeSystem.callbackNumber, hcsNotificationSystemExited, &timeout)
if err != nil {
return makeSystemError(computeSystem, "WaitTimeout", "", err, nil)
select {
case <-computeSystem.waitBlock:
if computeSystem.waitError != nil {
return makeSystemError(computeSystem, "WaitTimeout", "", computeSystem.waitError, nil)
}
return nil
case <-time.After(timeout):
return makeSystemError(computeSystem, "WaitTimeout", "", ErrTimeout, nil)
}
return nil
}
func (computeSystem *System) Properties(types ...schema1.PropertyType) (_ *schema1.ContainerProperties, err error) {
@@ -390,7 +409,7 @@ func (computeSystem *System) Properties(types ...schema1.PropertyType) (_ *schem
operation := "hcsshim::ComputeSystem::Properties"
computeSystem.logOperationBegin(operation)
defer func() { computeSystem.logOperationEnd(err) }()
defer func() { computeSystem.logOperationEnd(operation, err) }()
queryj, err := json.Marshal(schema1.PropertyQuery{types})
if err != nil {
@@ -429,7 +448,7 @@ func (computeSystem *System) Pause() (err error) {
operation := "hcsshim::ComputeSystem::Pause"
computeSystem.logOperationBegin(operation)
defer func() { computeSystem.logOperationEnd(err) }()
defer func() { computeSystem.logOperationEnd(operation, err) }()
if computeSystem.handle == 0 {
return makeSystemError(computeSystem, "Pause", "", ErrAlreadyClosed, nil)
@@ -454,7 +473,7 @@ func (computeSystem *System) Resume() (err error) {
operation := "hcsshim::ComputeSystem::Resume"
computeSystem.logOperationBegin(operation)
defer func() { computeSystem.logOperationEnd(err) }()
defer func() { computeSystem.logOperationEnd(operation, err) }()
if computeSystem.handle == 0 {
return makeSystemError(computeSystem, "Resume", "", ErrAlreadyClosed, nil)
@@ -479,7 +498,7 @@ func (computeSystem *System) CreateProcess(c interface{}) (_ *Process, err error
operation := "hcsshim::ComputeSystem::CreateProcess"
computeSystem.logOperationBegin(operation)
defer func() { computeSystem.logOperationEnd(err) }()
defer func() { computeSystem.logOperationEnd(operation, err) }()
var (
processInfo hcsProcessInformation
@@ -524,6 +543,7 @@ func (computeSystem *System) CreateProcess(c interface{}) (_ *Process, err error
if err = process.registerCallback(); err != nil {
return nil, makeSystemError(computeSystem, "CreateProcess", "", err, nil)
}
go process.waitBackground()
return process, nil
}
@@ -539,7 +559,7 @@ func (computeSystem *System) OpenProcess(pid int) (_ *Process, err error) {
operation := "hcsshim::ComputeSystem::OpenProcess"
computeSystem.logOperationBegin(operation)
defer func() { computeSystem.logOperationEnd(err) }()
defer func() { computeSystem.logOperationEnd(operation, err) }()
var (
processHandle hcsProcess
@@ -562,6 +582,7 @@ func (computeSystem *System) OpenProcess(pid int) (_ *Process, err error) {
if err = process.registerCallback(); err != nil {
return nil, makeSystemError(computeSystem, "OpenProcess", "", err, nil)
}
go process.waitBackground()
return process, nil
}
@@ -573,7 +594,7 @@ func (computeSystem *System) Close() (err error) {
operation := "hcsshim::ComputeSystem::Close"
computeSystem.logOperationBegin(operation)
defer func() { computeSystem.logOperationEnd(err) }()
defer func() { computeSystem.logOperationEnd(operation, err) }()
// Don't double free this
if computeSystem.handle == 0 {
@@ -645,7 +666,7 @@ func (computeSystem *System) unregisterCallback() error {
closeChannels(context.channels)
callbackMapLock.Lock()
callbackMap[callbackNumber] = nil
delete(callbackMap, callbackNumber)
callbackMapLock.Unlock()
handle = 0
@@ -660,7 +681,7 @@ func (computeSystem *System) Modify(config interface{}) (err error) {
operation := "hcsshim::ComputeSystem::Modify"
computeSystem.logOperationBegin(operation)
defer func() { computeSystem.logOperationEnd(err) }()
defer func() { computeSystem.logOperationEnd(operation, err) }()
if computeSystem.handle == 0 {
return makeSystemError(computeSystem, "Modify", "", ErrAlreadyClosed, nil)

View File

@@ -17,6 +17,11 @@ func processAsyncHcsResult(err error, resultp *uint16, callbackNumber uintptr, e
func waitForNotification(callbackNumber uintptr, expectedNotification hcsNotification, timeout *time.Duration) error {
callbackMapLock.RLock()
if _, ok := callbackMap[callbackNumber]; !ok {
callbackMapLock.RUnlock()
logrus.Errorf("failed to waitForNotification: callbackNumber %d does not exist in callbackMap", callbackNumber)
return ErrHandleClose
}
channels := callbackMap[callbackNumber].channels
callbackMapLock.RUnlock()