diff --git a/pkg/transfer/streaming/stream_test.go b/pkg/transfer/streaming/stream_test.go index c42d7ade9..f2a905474 100644 --- a/pkg/transfer/streaming/stream_test.go +++ b/pkg/transfer/streaming/stream_test.go @@ -32,88 +32,75 @@ func FuzzSendAndReceive(f *testing.F) { f.Add(bytes.Repeat([]byte{0}, windowSize+1)) f.Add([]byte("hello")) f.Add(bytes.Repeat([]byte("hello"), windowSize+1)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() f.Fuzz(func(t *testing.T, expected []byte) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - rs, ws := pipeStream() - r, w := io.Pipe() - SendStream(ctx, r, ws) - or := ReceiveStream(ctx, rs) - - go func() { - io.Copy(w, bytes.NewBuffer(expected)) - w.Close() - }() - - actual, err := io.ReadAll(or) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(expected, actual) { - t.Fatalf("received bytes are not equal\n\tactual: %v\n\texpected:%v", actual, expected) - } - }) -} -func FuzzSendAndReceiveChain(f *testing.F) { - f.Add([]byte{}) - f.Add([]byte{0}) - f.Add(bytes.Repeat([]byte{0}, windowSize+1)) - f.Add([]byte("hello")) - f.Add(bytes.Repeat([]byte("hello"), windowSize+1)) - f.Fuzz(func(t *testing.T, expected []byte) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - r, w := io.Pipe() - - or := chainStreams(ctx, chainStreams(ctx, chainStreams(ctx, r))) - - go func() { - io.Copy(w, bytes.NewBuffer(expected)) - w.Close() - }() - - actual, err := io.ReadAll(or) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(expected, actual) { - t.Fatalf("received bytes are not equal\n\tactual: %v\n\texpected:%v", actual, expected) - } + runSendAndReceiveFuzz(ctx, t, expected) + runSendAndReceiveChainFuzz(ctx, t, expected) + runWriterFuzz(ctx, t, expected) }) } -func FuzzWriter(f *testing.F) { - f.Add([]byte{}) - f.Add([]byte{0}) - f.Add(bytes.Repeat([]byte{0}, windowSize+1)) - f.Add([]byte("hello")) - f.Add(bytes.Repeat([]byte("hello"), windowSize+1)) - f.Fuzz(func(t *testing.T, expected []byte) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func runSendAndReceiveFuzz(ctx context.Context, t *testing.T, expected []byte) { + rs, ws := pipeStream() + r, w := io.Pipe() + SendStream(ctx, r, ws) + or := ReceiveStream(ctx, rs) - rs, ws := pipeStream() - wc := WriteByteStream(ctx, ws) - or := ReceiveStream(ctx, rs) + go func() { + io.Copy(w, bytes.NewBuffer(expected)) + w.Close() + }() - go func() { - io.Copy(wc, bytes.NewBuffer(expected)) - wc.Close() - }() + actual, err := io.ReadAll(or) + if err != nil { + t.Fatal(err) + } - actual, err := io.ReadAll(or) - if err != nil { - t.Fatal(err) - } + if !bytes.Equal(expected, actual) { + t.Fatalf("received bytes are not equal\n\tactual: %v\n\texpected:%v", actual, expected) + } +} - if !bytes.Equal(expected, actual) { - t.Fatalf("received bytes are not equal\n\tactual: %v\n\texpected:%v", actual, expected) - } - }) +func runSendAndReceiveChainFuzz(ctx context.Context, t *testing.T, expected []byte) { + r, w := io.Pipe() + + or := chainStreams(ctx, chainStreams(ctx, chainStreams(ctx, r))) + + go func() { + io.Copy(w, bytes.NewBuffer(expected)) + w.Close() + }() + + actual, err := io.ReadAll(or) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(expected, actual) { + t.Fatalf("received bytes are not equal\n\tactual: %v\n\texpected:%v", actual, expected) + } +} + +func runWriterFuzz(ctx context.Context, t *testing.T, expected []byte) { + rs, ws := pipeStream() + wc := WriteByteStream(ctx, ws) + or := ReceiveStream(ctx, rs) + + go func() { + io.Copy(wc, bytes.NewBuffer(expected)) + wc.Close() + }() + + actual, err := io.ReadAll(or) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(expected, actual) { + t.Fatalf("received bytes are not equal\n\tactual: %v\n\texpected:%v", actual, expected) + } } func chainStreams(ctx context.Context, r io.Reader) io.Reader {