Merge pull request #118362 from linxiulei/zero_copy
Do not copy bytes for cached serializations
This commit is contained in:
		
							
								
								
									
										76
									
								
								staging/src/k8s.io/apimachinery/pkg/runtime/splice.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										76
									
								
								staging/src/k8s.io/apimachinery/pkg/runtime/splice.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,76 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2023 The Kubernetes Authors. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package runtime | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"bytes" | ||||||
|  | 	"io" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // Splice is the interface that wraps the Splice method. | ||||||
|  | // | ||||||
|  | // Splice moves data from given slice without copying the underlying data for | ||||||
|  | // efficiency purpose. Therefore, the caller should make sure the underlying | ||||||
|  | // data is not changed later. | ||||||
|  | type Splice interface { | ||||||
|  | 	Splice([]byte) | ||||||
|  | 	io.Writer | ||||||
|  | 	Reset() | ||||||
|  | 	Bytes() []byte | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // A spliceBuffer implements Splice and io.Writer interfaces. | ||||||
|  | type spliceBuffer struct { | ||||||
|  | 	raw []byte | ||||||
|  | 	buf *bytes.Buffer | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewSpliceBuffer() Splice { | ||||||
|  | 	return &spliceBuffer{} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Splice implements the Splice interface. | ||||||
|  | func (sb *spliceBuffer) Splice(raw []byte) { | ||||||
|  | 	sb.raw = raw | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Write implements the io.Writer interface. | ||||||
|  | func (sb *spliceBuffer) Write(p []byte) (n int, err error) { | ||||||
|  | 	if sb.buf == nil { | ||||||
|  | 		sb.buf = &bytes.Buffer{} | ||||||
|  | 	} | ||||||
|  | 	return sb.buf.Write(p) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Reset resets the buffer to be empty. | ||||||
|  | func (sb *spliceBuffer) Reset() { | ||||||
|  | 	if sb.buf != nil { | ||||||
|  | 		sb.buf.Reset() | ||||||
|  | 	} | ||||||
|  | 	sb.raw = nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Bytes returns the data held by the buffer. | ||||||
|  | func (sb *spliceBuffer) Bytes() []byte { | ||||||
|  | 	if sb.buf != nil && len(sb.buf.Bytes()) > 0 { | ||||||
|  | 		return sb.buf.Bytes() | ||||||
|  | 	} | ||||||
|  | 	if sb.raw != nil { | ||||||
|  | 		return sb.raw | ||||||
|  | 	} | ||||||
|  | 	return []byte{} | ||||||
|  | } | ||||||
							
								
								
									
										121
									
								
								staging/src/k8s.io/apimachinery/pkg/runtime/splice_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										121
									
								
								staging/src/k8s.io/apimachinery/pkg/runtime/splice_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,121 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2023 The Kubernetes Authors. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package runtime_test | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"bytes" | ||||||
|  | 	"testing" | ||||||
|  |  | ||||||
|  | 	"k8s.io/apimachinery/pkg/runtime" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func TestSpliceBuffer(t *testing.T) { | ||||||
|  | 	testBytes0 := []byte{0x01, 0x02, 0x03, 0x04} | ||||||
|  | 	testBytes1 := []byte{0x04, 0x03, 0x02, 0x02} | ||||||
|  |  | ||||||
|  | 	testCases := []struct { | ||||||
|  | 		name string | ||||||
|  | 		run  func(sb runtime.Splice, buf *bytes.Buffer) | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			name: "Basic Write", | ||||||
|  | 			run: func(sb runtime.Splice, buf *bytes.Buffer) { | ||||||
|  | 				sb.Write(testBytes0) | ||||||
|  | 				buf.Write(testBytes0) | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "Multiple Writes", | ||||||
|  | 			run: func(sb runtime.Splice, buf *bytes.Buffer) { | ||||||
|  | 				for _, b := range testBytes0 { | ||||||
|  | 					sb.Write([]byte{b}) | ||||||
|  | 					buf.Write([]byte{b}) | ||||||
|  | 				} | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "Write and Reset", | ||||||
|  | 			run: func(sb runtime.Splice, buf *bytes.Buffer) { | ||||||
|  | 				sb.Write(testBytes0) | ||||||
|  | 				buf.Write(testBytes0) | ||||||
|  |  | ||||||
|  | 				sb.Reset() | ||||||
|  | 				buf.Reset() | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "Write/Splice", | ||||||
|  | 			run: func(sb runtime.Splice, buf *bytes.Buffer) { | ||||||
|  | 				sb.Splice(testBytes0) | ||||||
|  | 				buf.Write(testBytes0) | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "Write/Splice and Reset", | ||||||
|  | 			run: func(sb runtime.Splice, buf *bytes.Buffer) { | ||||||
|  | 				sb.Splice(testBytes0) | ||||||
|  | 				buf.Write(testBytes0) | ||||||
|  |  | ||||||
|  | 				sb.Reset() | ||||||
|  | 				buf.Reset() | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "Write/Splice, Reset, Write/Splice", | ||||||
|  | 			run: func(sb runtime.Splice, buf *bytes.Buffer) { | ||||||
|  | 				sb.Splice(testBytes0) | ||||||
|  | 				buf.Write(testBytes0) | ||||||
|  |  | ||||||
|  | 				sb.Reset() | ||||||
|  | 				buf.Reset() | ||||||
|  |  | ||||||
|  | 				sb.Splice(testBytes1) | ||||||
|  | 				buf.Write(testBytes1) | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "Write, Reset, Splice", | ||||||
|  | 			run: func(sb runtime.Splice, buf *bytes.Buffer) { | ||||||
|  | 				sb.Write(testBytes0) | ||||||
|  | 				buf.Write(testBytes0) | ||||||
|  |  | ||||||
|  | 				sb.Reset() | ||||||
|  | 				buf.Reset() | ||||||
|  |  | ||||||
|  | 				sb.Splice(testBytes1) | ||||||
|  | 				buf.Write(testBytes1) | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for _, tt := range testCases { | ||||||
|  | 		t.Run(tt.name, func(t *testing.T) { | ||||||
|  | 			sb := runtime.NewSpliceBuffer() | ||||||
|  | 			buf := &bytes.Buffer{} | ||||||
|  | 			tt.run(sb, buf) | ||||||
|  |  | ||||||
|  | 			if sb.Bytes() == nil { | ||||||
|  | 				t.Errorf("Unexpected nil") | ||||||
|  | 			} | ||||||
|  | 			if string(sb.Bytes()) != string(buf.Bytes()) { | ||||||
|  | 				t.Errorf("Expected sb.Bytes() == %q, buf.Bytes() == %q", sb.Bytes(), buf.Bytes()) | ||||||
|  | 			} | ||||||
|  | 		}) | ||||||
|  |  | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | } | ||||||
| @@ -219,7 +219,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { | |||||||
| 	var unknown runtime.Unknown | 	var unknown runtime.Unknown | ||||||
| 	internalEvent := &metav1.InternalEvent{} | 	internalEvent := &metav1.InternalEvent{} | ||||||
| 	outEvent := &metav1.WatchEvent{} | 	outEvent := &metav1.WatchEvent{} | ||||||
| 	buf := &bytes.Buffer{} | 	buf := runtime.NewSpliceBuffer() | ||||||
| 	ch := s.Watching.ResultChan() | 	ch := s.Watching.ResultChan() | ||||||
| 	done := req.Context().Done() | 	done := req.Context().Done() | ||||||
|  |  | ||||||
|   | |||||||
| @@ -148,6 +148,10 @@ func (o *cachingObject) CacheEncode(id runtime.Identifier, encode func(runtime.O | |||||||
| 	if result.err != nil { | 	if result.err != nil { | ||||||
| 		return result.err | 		return result.err | ||||||
| 	} | 	} | ||||||
|  | 	if b, support := w.(runtime.Splice); support { | ||||||
|  | 		b.Splice(result.raw) | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
| 	_, err := w.Write(result.raw) | 	_, err := w.Write(result.raw) | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot