fix pusher concurrent close channel
Signed-off-by: rongfu.leng <rongfu.leng@daocloud.io>
This commit is contained in:
		| @@ -24,6 +24,7 @@ import ( | |||||||
| 	"net/http" | 	"net/http" | ||||||
| 	"net/url" | 	"net/url" | ||||||
| 	"strings" | 	"strings" | ||||||
|  | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/containerd/containerd/content" | 	"github.com/containerd/containerd/content" | ||||||
| @@ -322,6 +323,7 @@ type pushWriter struct { | |||||||
|  |  | ||||||
| 	pipeC     chan *io.PipeWriter | 	pipeC     chan *io.PipeWriter | ||||||
| 	respC     chan *http.Response | 	respC     chan *http.Response | ||||||
|  | 	closeOnce sync.Once | ||||||
| 	errC      chan error | 	errC      chan error | ||||||
|  |  | ||||||
| 	isManifest bool | 	isManifest bool | ||||||
| @@ -398,14 +400,9 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) { | |||||||
| func (pw *pushWriter) Close() error { | func (pw *pushWriter) Close() error { | ||||||
| 	// Ensure pipeC is closed but handle `Close()` being | 	// Ensure pipeC is closed but handle `Close()` being | ||||||
| 	// called multiple times without panicking | 	// called multiple times without panicking | ||||||
| 	select { | 	pw.closeOnce.Do(func() { | ||||||
| 	case _, ok := <-pw.pipeC: |  | ||||||
| 		if ok { |  | ||||||
| 		close(pw.pipeC) | 		close(pw.pipeC) | ||||||
| 		} | 	}) | ||||||
| 	default: |  | ||||||
| 		close(pw.pipeC) |  | ||||||
| 	} |  | ||||||
| 	if pw.pipe != nil { | 	if pw.pipe != nil { | ||||||
| 		status, err := pw.tracker.GetStatus(pw.ref) | 		status, err := pw.tracker.GetStatus(pw.ref) | ||||||
| 		if err == nil && !status.Committed { | 		if err == nil && !status.Committed { | ||||||
|   | |||||||
| @@ -293,7 +293,7 @@ func Test_dockerPusher_push(t *testing.T) { | |||||||
| 		dp               dockerPusher | 		dp               dockerPusher | ||||||
| 		dockerBaseObject string | 		dockerBaseObject string | ||||||
| 		args             args | 		args             args | ||||||
| 		checkerFunc      func(writer pushWriter) bool | 		checkerFunc      func(writer *pushWriter) bool | ||||||
| 		wantErr          error | 		wantErr          error | ||||||
| 	}{ | 	}{ | ||||||
| 		{ | 		{ | ||||||
| @@ -306,7 +306,7 @@ func Test_dockerPusher_push(t *testing.T) { | |||||||
| 				ref:               fmt.Sprintf("manifest-%s", manifestContentDigest.String()), | 				ref:               fmt.Sprintf("manifest-%s", manifestContentDigest.String()), | ||||||
| 				unavailableOnFail: false, | 				unavailableOnFail: false, | ||||||
| 			}, | 			}, | ||||||
| 			checkerFunc: func(writer pushWriter) bool { | 			checkerFunc: func(writer *pushWriter) bool { | ||||||
| 				select { | 				select { | ||||||
| 				case resp := <-writer.respC: | 				case resp := <-writer.respC: | ||||||
| 					// 201 should be the response code when uploading a new manifest | 					// 201 should be the response code when uploading a new manifest | ||||||
| @@ -340,7 +340,7 @@ func Test_dockerPusher_push(t *testing.T) { | |||||||
| 				ref:               fmt.Sprintf("layer-%s", layerContentDigest.String()), | 				ref:               fmt.Sprintf("layer-%s", layerContentDigest.String()), | ||||||
| 				unavailableOnFail: false, | 				unavailableOnFail: false, | ||||||
| 			}, | 			}, | ||||||
| 			checkerFunc: func(writer pushWriter) bool { | 			checkerFunc: func(writer *pushWriter) bool { | ||||||
| 				select { | 				select { | ||||||
| 				case resp := <-writer.respC: | 				case resp := <-writer.respC: | ||||||
| 					// 201 should be the response code when uploading a new blob | 					// 201 should be the response code when uploading a new blob | ||||||
| @@ -379,7 +379,7 @@ func Test_dockerPusher_push(t *testing.T) { | |||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			// test whether a proper response has been received after the push operation | 			// test whether a proper response has been received after the push operation | ||||||
| 			assert.True(t, test.checkerFunc(*pw)) | 			assert.True(t, test.checkerFunc(pw)) | ||||||
|  |  | ||||||
| 		}) | 		}) | ||||||
| 	} | 	} | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 rongfu.leng
					rongfu.leng